mimictalk / data_gen /utils /process_video /inpaint_torso_imgs.py
mrbear1024's picture
init project
8eb4303
import cv2
import os
import numpy as np
from utils.commons.multiprocess_utils import multiprocess_run_tqdm
from scipy.ndimage import binary_erosion, binary_dilation
from tasks.eg3ds.loss_utils.segment_loss.mp_segmenter import MediapipeSegmenter
seg_model = MediapipeSegmenter()
def inpaint_torso_job(video_name, idx=None, total=None):
raw_img_dir = video_name.replace(".mp4", "").replace("/video/","/gt_imgs/")
img_names = glob.glob(os.path.join(raw_img_dir, "*.jpg"))
for image_path in tqdm.tqdm(img_names):
# read ori image
ori_image = cv2.imread(image_path, cv2.IMREAD_UNCHANGED) # [H, W, 3]
segmap = seg_model._cal_seg_map(cv2.cvtColor(ori_image, cv2.COLOR_BGR2RGB))
head_part = (segmap[1] + segmap[3] + segmap[5]).astype(np.bool)
torso_part = (segmap[4]).astype(np.bool)
neck_part = (segmap[2]).astype(np.bool)
bg_part = segmap[0].astype(np.bool)
head_image = cv2.imread(image_path.replace("/gt_imgs/", "/head_imgs/"), cv2.IMREAD_UNCHANGED) # [H, W, 3]
torso_image = cv2.imread(image_path.replace("/gt_imgs/", "/torso_imgs/"), cv2.IMREAD_UNCHANGED) # [H, W, 3]
bg_image = cv2.imread(image_path.replace("/gt_imgs/", "/bg_imgs/"), cv2.IMREAD_UNCHANGED) # [H, W, 3]
# head_part = (head_image[...,0] != 0) & (head_image[...,1] != 0) & (head_image[...,2] != 0)
# torso_part = (torso_image[...,0] != 0) & (torso_image[...,1] != 0) & (torso_image[...,2] != 0)
# bg_part = (bg_image[...,0] != 0) & (bg_image[...,1] != 0) & (bg_image[...,2] != 0)
# get gt image
gt_image = ori_image.copy()
gt_image[bg_part] = bg_image[bg_part]
cv2.imwrite(image_path.replace('ori_imgs', 'gt_imgs'), gt_image)
# get torso image
torso_image = gt_image.copy() # rgb
torso_image[head_part] = 0
torso_alpha = 255 * np.ones((gt_image.shape[0], gt_image.shape[1], 1), dtype=np.uint8) # alpha
# torso part "vertical" in-painting...
L = 8 + 1
torso_coords = np.stack(np.nonzero(torso_part), axis=-1) # [M, 2]
# lexsort: sort 2D coords first by y then by x,
# ref: https://stackoverflow.com/questions/2706605/sorting-a-2d-numpy-array-by-multiple-axes
inds = np.lexsort((torso_coords[:, 0], torso_coords[:, 1]))
torso_coords = torso_coords[inds]
# choose the top pixel for each column
u, uid, ucnt = np.unique(torso_coords[:, 1], return_index=True, return_counts=True)
top_torso_coords = torso_coords[uid] # [m, 2]
# only keep top-is-head pixels
top_torso_coords_up = top_torso_coords.copy() - np.array([1, 0]) # [N, 2]
mask = head_part[tuple(top_torso_coords_up.T)]
if mask.any():
top_torso_coords = top_torso_coords[mask]
# get the color
top_torso_colors = gt_image[tuple(top_torso_coords.T)] # [m, 3]
# construct inpaint coords (vertically up, or minus in x)
inpaint_torso_coords = top_torso_coords[None].repeat(L, 0) # [L, m, 2]
inpaint_offsets = np.stack([-np.arange(L), np.zeros(L, dtype=np.int32)], axis=-1)[:, None] # [L, 1, 2]
inpaint_torso_coords += inpaint_offsets
inpaint_torso_coords = inpaint_torso_coords.reshape(-1, 2) # [Lm, 2]
inpaint_torso_colors = top_torso_colors[None].repeat(L, 0) # [L, m, 3]
darken_scaler = 0.98 ** np.arange(L).reshape(L, 1, 1) # [L, 1, 1]
inpaint_torso_colors = (inpaint_torso_colors * darken_scaler).reshape(-1, 3) # [Lm, 3]
# set color
torso_image[tuple(inpaint_torso_coords.T)] = inpaint_torso_colors
inpaint_torso_mask = np.zeros_like(torso_image[..., 0]).astype(bool)
inpaint_torso_mask[tuple(inpaint_torso_coords.T)] = True
else:
inpaint_torso_mask = None
# neck part "vertical" in-painting...
push_down = 4
L = 48 + push_down + 1
neck_part = binary_dilation(neck_part, structure=np.array([[0, 1, 0], [0, 1, 0], [0, 1, 0]], dtype=bool), iterations=3)
neck_coords = np.stack(np.nonzero(neck_part), axis=-1) # [M, 2]
# lexsort: sort 2D coords first by y then by x,
# ref: https://stackoverflow.com/questions/2706605/sorting-a-2d-numpy-array-by-multiple-axes
inds = np.lexsort((neck_coords[:, 0], neck_coords[:, 1]))
neck_coords = neck_coords[inds]
# choose the top pixel for each column
u, uid, ucnt = np.unique(neck_coords[:, 1], return_index=True, return_counts=True)
top_neck_coords = neck_coords[uid] # [m, 2]
# only keep top-is-head pixels
top_neck_coords_up = top_neck_coords.copy() - np.array([1, 0])
mask = head_part[tuple(top_neck_coords_up.T)]
top_neck_coords = top_neck_coords[mask]
# push these top down for 4 pixels to make the neck inpainting more natural...
offset_down = np.minimum(ucnt[mask] - 1, push_down)
top_neck_coords += np.stack([offset_down, np.zeros_like(offset_down)], axis=-1)
# get the color
top_neck_colors = gt_image[tuple(top_neck_coords.T)] # [m, 3]
# construct inpaint coords (vertically up, or minus in x)
inpaint_neck_coords = top_neck_coords[None].repeat(L, 0) # [L, m, 2]
inpaint_offsets = np.stack([-np.arange(L), np.zeros(L, dtype=np.int32)], axis=-1)[:, None] # [L, 1, 2]
inpaint_neck_coords += inpaint_offsets
inpaint_neck_coords = inpaint_neck_coords.reshape(-1, 2) # [Lm, 2]
inpaint_neck_colors = top_neck_colors[None].repeat(L, 0) # [L, m, 3]
darken_scaler = 0.98 ** np.arange(L).reshape(L, 1, 1) # [L, 1, 1]
inpaint_neck_colors = (inpaint_neck_colors * darken_scaler).reshape(-1, 3) # [Lm, 3]
# set color
torso_image[tuple(inpaint_neck_coords.T)] = inpaint_neck_colors
# apply blurring to the inpaint area to avoid vertical-line artifects...
inpaint_mask = np.zeros_like(torso_image[..., 0]).astype(bool)
inpaint_mask[tuple(inpaint_neck_coords.T)] = True
blur_img = torso_image.copy()
blur_img = cv2.GaussianBlur(blur_img, (5, 5), cv2.BORDER_DEFAULT)
torso_image[inpaint_mask] = blur_img[inpaint_mask]
# set mask
mask = (neck_part | torso_part | inpaint_mask)
if inpaint_torso_mask is not None:
mask = mask | inpaint_torso_mask
torso_image[~mask] = 0
torso_alpha[~mask] = 0
cv2.imwrite("0.png", np.concatenate([torso_image, torso_alpha], axis=-1))
print(f'[INFO] ===== extracted torso and gt images =====')
def out_exist_job(vid_name):
out_dir1 = vid_name.replace("/video/", "/inpaint_torso_imgs/").replace(".mp4","")
out_dir2 = vid_name.replace("/video/", "/inpaint_torso_with_bg_imgs/").replace(".mp4","")
out_dir3 = vid_name.replace("/video/", "/torso_imgs/").replace(".mp4","")
out_dir4 = vid_name.replace("/video/", "/torso_with_bg_imgs/").replace(".mp4","")
if os.path.exists(out_dir1) and os.path.exists(out_dir1) and os.path.exists(out_dir2) and os.path.exists(out_dir3) and os.path.exists(out_dir4):
num_frames = len(os.listdir(out_dir1))
if len(os.listdir(out_dir1)) == num_frames and len(os.listdir(out_dir2)) == num_frames and len(os.listdir(out_dir3)) == num_frames and len(os.listdir(out_dir4)) == num_frames:
return None
else:
return vid_name
else:
return vid_name
def get_todo_vid_names(vid_names):
todo_vid_names = []
for i, res in multiprocess_run_tqdm(out_exist_job, vid_names, num_workers=16):
if res is not None:
todo_vid_names.append(res)
return todo_vid_names
if __name__ == '__main__':
import argparse, glob, tqdm, random
parser = argparse.ArgumentParser()
parser.add_argument("--vid_dir", default='/home/tiger/datasets/raw/CelebV-HQ/video')
parser.add_argument("--ds_name", default='CelebV-HQ')
parser.add_argument("--num_workers", default=48, type=int)
parser.add_argument("--seed", default=0, type=int)
parser.add_argument("--process_id", default=0, type=int)
parser.add_argument("--total_process", default=1, type=int)
parser.add_argument("--reset", action='store_true')
inpaint_torso_job('/home/tiger/datasets/raw/CelebV-HQ/video/dgdEr-mXQT4_8.mp4')
# args = parser.parse_args()
# vid_dir = args.vid_dir
# ds_name = args.ds_name
# if ds_name in ['lrs3_trainval']:
# mp4_name_pattern = os.path.join(vid_dir, "*/*.mp4")
# if ds_name in ['TH1KH_512', 'CelebV-HQ']:
# vid_names = glob.glob(os.path.join(vid_dir, "*.mp4"))
# elif ds_name in ['lrs2', 'lrs3', 'voxceleb2']:
# vid_name_pattern = os.path.join(vid_dir, "*/*/*.mp4")
# vid_names = glob.glob(vid_name_pattern)
# vid_names = sorted(vid_names)
# random.seed(args.seed)
# random.shuffle(vid_names)
# process_id = args.process_id
# total_process = args.total_process
# if total_process > 1:
# assert process_id <= total_process -1
# num_samples_per_process = len(vid_names) // total_process
# if process_id == total_process:
# vid_names = vid_names[process_id * num_samples_per_process : ]
# else:
# vid_names = vid_names[process_id * num_samples_per_process : (process_id+1) * num_samples_per_process]
# if not args.reset:
# vid_names = get_todo_vid_names(vid_names)
# print(f"todo videos number: {len(vid_names)}")
# fn_args = [(vid_name,i,len(vid_names)) for i, vid_name in enumerate(vid_names)]
# for vid_name in multiprocess_run_tqdm(inpaint_torso_job ,fn_args, desc=f"Root process {args.process_id}: extracting segment images", num_workers=args.num_workers):
# pass