Spaces:
Sleeping
Sleeping
| #! /usr/bin/env python | |
| # -*- coding: utf-8 -*- | |
| # Copyright 2020 Imperial College London (Pingchuan Ma) | |
| # Apache 2.0 (http://www.apache.org/licenses/LICENSE-2.0) | |
| """ Crop Mouth ROIs from videos for lipreading""" | |
| # from msilib.schema import File | |
| from ast import Pass | |
| import os | |
| import cv2 # OpenCV λΌμ΄λΈλ¬λ¦¬ | |
| import glob # 리λ μ€μ κ²½λ‘ νκΈ°λ²μ μ¬μ©νμ¬ μνλ ν΄λ/νμΌ λ¦¬μ€νΈ μ»μ | |
| import argparse # λͺ λ Ήν μΈμλ₯Ό νμ±ν΄μ£Όλ λͺ¨λ | |
| import numpy as np | |
| from collections import deque # collections λͺ¨λμ μλ λ°ν¬ λΆλ¬μ€κΈ° # λ°ν¬: μ€νκ³Ό νλ₯Ό ν©μΉ μλ£κ΅¬μ‘° | |
| from utils import * # utils.py λͺ¨λμ μλ λͺ¨λ ν¨μ λΆλ¬μ€κΈ° | |
| from transform import * # transform.py λͺ¨λμ μλ λͺ¨λ ν¨μ λΆλ¬μ€κΈ° | |
| import dlib # face landmark μ°Ύλ λΌμ΄λΈλ¬λ¦¬ | |
| import face_alignment # face landmark μ°Ύλ λΌμ΄λΈλ¬λ¦¬ | |
| from PIL import Image | |
| # μΈμκ°μ λ°μμ μ²λ¦¬νλ ν¨μ | |
| def load_args(default_config=None): | |
| # μΈμκ°μ λ°μμ μ²λ¦¬νλ ν¨μ | |
| parser = argparse.ArgumentParser(description='Lipreading Pre-processing') | |
| # μ λ ₯λ°μ μΈμκ° λ±λ‘ | |
| # -- utils | |
| parser.add_argument('--video-direc', default=None, help='raw video directory') | |
| parser.add_argument('--video-format', default='.mp4', help='raw video format') | |
| parser.add_argument('--landmark-direc', default=None, help='landmark directory') | |
| parser.add_argument('--filename-path', default='./vietnamese_detected_face_30.csv', help='list of detected video and its subject ID') | |
| parser.add_argument('--save-direc', default=None, help='the directory of saving mouth ROIs') | |
| # -- mean face utils | |
| parser.add_argument('--mean-face', default='./20words_mean_face.npy', help='mean face pathname') | |
| # -- mouthROIs utils | |
| parser.add_argument('--crop-width', default=96, type=int, help='the width of mouth ROIs') | |
| parser.add_argument('--crop-height', default=96, type=int, help='the height of mouth ROIs') | |
| parser.add_argument('--start-idx', default=48, type=int, help='the start of landmark index') | |
| parser.add_argument('--stop-idx', default=68, type=int, help='the end of landmark index') | |
| parser.add_argument('--window-margin', default=12, type=int, help='window margin for smoothed_landmarks') | |
| # -- convert to gray scale | |
| parser.add_argument('--convert-gray', default=False, action='store_true', help='convert2grayscale') | |
| # -- test set only | |
| parser.add_argument('--testset-only', default=False, action='store_true', help='process testing set only') | |
| # μ λ ₯λ°μ μΈμκ°μ argsμ μ μ₯ (type: namespace) | |
| args = parser.parse_args() | |
| return args | |
| args = load_args() # args νμ± λ° λ‘λ | |
| # -- mean face utils | |
| STD_SIZE = (256, 256) | |
| mean_face_landmarks = np.load(args.mean_face) # 20words_mean_face.npy | |
| stablePntsIDs = [33, 36, 39, 42, 45] | |
| # μμμμ λλλ§ν¬ λ°μμ μ μ μλΌλ΄κΈ° | |
| def crop_patch( video_pathname, landmarks): | |
| """Crop mouth patch | |
| :param str video_pathname: pathname for the video_dieo # μμ μμΉ | |
| :param list landmarks: interpolated landmarks # 보κ°λ λλλ§ν¬ | |
| """ | |
| frame_idx = 0 # νλ μ μΈλ±μ€ λ²νΈ 0 μΌλ‘ μ΄κΈ°ν | |
| frame_gen = read_video(video_pathname) # λΉλμ€ λΆλ¬μ€κΈ° | |
| # 무ν λ°λ³΅ | |
| while True: | |
| try: | |
| frame = frame_gen.__next__() ## -- BGR # μ΄λ―Έμ§ νλ μ νλμ© λΆλ¬μ€κΈ° | |
| except StopIteration: # λ μ΄μ next μμκ° μμΌλ©΄ StopIterraion Exception λ°μ | |
| break # while λΉ μ Έλκ°κΈ° | |
| if frame_idx == 0: # νλ μ μΈλ±μ€ λ²νΈκ° 0μΌ κ²½μ° | |
| q_frame, q_landmarks = deque(), deque() # λ°ν¬ μμ± | |
| sequence = [] | |
| q_landmarks.append(landmarks[frame_idx]) # νλ μ μΈλ±μ€ λ²νΈμ λ§λ λλλ§ν¬ μ 보 μΆκ° | |
| q_frame.append(frame) # νλ μ μ 보 μΆκ° | |
| if len(q_frame) == args.window_margin: | |
| smoothed_landmarks = np.mean(q_landmarks, axis=0) # κ° κ·Έλ£Ήμ κ°μ μμλΌλ¦¬ νκ· | |
| cur_landmarks = q_landmarks.popleft() # λ°ν¬ μ μΌ μΌμͺ½ κ° κΊΌλ΄κΈ° | |
| cur_frame = q_frame.popleft() # λ°ν¬ μ μΌ μΌμͺ½ κ° κΊΌλ΄κΈ° | |
| # -- affine transformation # μν λ³ν | |
| trans_frame, trans = warp_img( smoothed_landmarks[stablePntsIDs, :], | |
| mean_face_landmarks[stablePntsIDs, :], | |
| cur_frame, | |
| STD_SIZE) | |
| trans_landmarks = trans(cur_landmarks) | |
| # -- crop mouth patch # μ μ μλΌλ΄κΈ° | |
| sequence.append( cut_patch( trans_frame, | |
| trans_landmarks[args.start_idx:args.stop_idx], | |
| args.crop_height//2, | |
| args.crop_width//2,)) | |
| if frame_idx == len(landmarks)-1: | |
| while q_frame: | |
| cur_frame = q_frame.popleft() # λ°ν¬ μ μΌ μΌμͺ½ κ° κΊΌλ΄κΈ° | |
| # -- transform frame # νλ μ λ³ν | |
| trans_frame = apply_transform( trans, cur_frame, STD_SIZE) | |
| # -- transform landmarks # λλλ§ν¬ λ³ν | |
| trans_landmarks = trans(q_landmarks.popleft()) | |
| # -- crop mouth patch # μ μ μλΌλ΄κΈ° | |
| sequence.append( cut_patch( trans_frame, | |
| trans_landmarks[args.start_idx:args.stop_idx], | |
| args.crop_height//2, | |
| args.crop_width//2,)) | |
| return np.array(sequence) # μ μ numpy λ°ν | |
| frame_idx += 1 # νλ μ μΈλ±μ€ λ²νΈ μ¦κ° | |
| return None | |
| # λλλ§ν¬ λ³΄κ° | |
| def landmarks_interpolate(landmarks): | |
| """Interpolate landmarks | |
| param list landmarks: landmarks detected in raw videos # μλ³Έ μμ λ°μ΄ν°μμ κ²μΆν λλλ§ν¬ | |
| """ | |
| valid_frames_idx = [idx for idx, _ in enumerate(landmarks) if _ is not None] # λλλ§ν¬ λ²νΈ list μμ± | |
| # λλλ§ν¬ λ²νΈ list κ° λΉμ΄μλ€λ©΄ | |
| if not valid_frames_idx: | |
| return None | |
| # 1λΆν° (λλλ§ν¬ λ²νΈ list κ°μ-1)λ§νΌ for λ¬Έ λ°λ³΅ | |
| for idx in range(1, len(valid_frames_idx)): | |
| if valid_frames_idx[idx] - valid_frames_idx[idx-1] == 1: # νμ¬ λλλ§ν¬ λ²νΈ - μ΄μ λλλ§ν¬ λ²νΈ == 1 μΌ κ²½μ° | |
| continue # μ½λ μ€ν 건λλ°κΈ° | |
| else: # μλλΌλ©΄ | |
| landmarks = linear_interpolate(landmarks, valid_frames_idx[idx-1], valid_frames_idx[idx]) # λλλ§ν¬ μ λ°μ΄νΈ(보κ°) | |
| valid_frames_idx = [idx for idx, _ in enumerate(landmarks) if _ is not None] # λλλ§ν¬ λ²νΈ list μμ± | |
| # -- Corner case: keep frames at the beginning or at the end failed to be detected. # μμ λλ λ νλ μμ 보κ΄νμ§ λͺ»ν¨ | |
| if valid_frames_idx: | |
| landmarks[:valid_frames_idx[0]] = [landmarks[valid_frames_idx[0]]] * valid_frames_idx[0] # λλλ§ν¬ 첫λ²μ§Έ νλ μ μ 보 μ μ₯ | |
| landmarks[valid_frames_idx[-1]:] = [landmarks[valid_frames_idx[-1]]] * (len(landmarks) - valid_frames_idx[-1]) # λλλ§ν¬ λ§μ§λ§ νλ μ μ 보 μ μ₯ | |
| valid_frames_idx = [idx for idx, _ in enumerate(landmarks) if _ is not None] # λλλ§ν¬ λ²νΈ list μμ± | |
| # λλλ§ν¬ λ²νΈ list κ°μ == 보κ°ν λλλ§ν¬ κ°μ νμΈ, μλλ©΄ AssertionError λ©μμ§λ₯Ό λμ | |
| assert len(valid_frames_idx) == len(landmarks), "not every frame has landmark" # μνλ 쑰건μ λ³μκ°μ 보μ¦νκΈ° μν΄ μ¬μ© | |
| return landmarks # λλλ§ν¬ λ°ν | |
| def get_yield(output_video): | |
| for frame in output_video: | |
| yield frame | |
| lines = open(args.filename_path).read().splitlines() # λ¬Έμμ΄μ '\n' κΈ°μ€μΌλ‘ μͺΌκ° ν list μμ± | |
| lines = list(filter(lambda x: 'test' == x.split('/')[-2], lines)) if args.testset_only else lines # args.testset_only κ°μ΄ μλ€λ©΄ test ν΄λ μ νμΌλͺ λ§ λΆλ¬μμ list μμ±, μλλΌλ©΄ μλ lines κ·Έλλ‘ κ° μ μ§ | |
| # lines κ°μλ§νΌ λ°λ³΅λ¬Έ μ€ν | |
| for filename_idx, line in enumerate(lines): | |
| # νμΌλͺ , μ¬λid | |
| filename, person_id = line.split(',') | |
| print('idx: {} \tProcessing.\t{}'.format(filename_idx, filename)) # νμΌ μΈλ±μ€λ²νΈ, νμΌλͺ μΆλ ₯ | |
| video_pathname = os.path.join(args.video_direc, filename+args.video_format) # μμλλ ν 리 + νμΌλͺ .λΉλμ€ν¬λ§·/ | |
| landmarks_pathname = os.path.join(args.landmark_direc, filename+'.npz') # μ μ₯λλ ν 리 + λλλ§ν¬ νμΌλͺ .npz | |
| dst_pathname = os.path.join( args.save_direc, filename+'.npz') # μ μ₯λλ ν 리 + κ²°κ³Όμμ νμΌλͺ .npz | |
| # νμΌμ΄ μλμ§ νμΈ, μμΌλ©΄ AssertionError λ©μμ§λ₯Ό λμ | |
| assert os.path.isfile(video_pathname), "File does not exist. Path input: {}".format(video_pathname) # μνλ 쑰건μ λ³μκ°μ 보μ¦νκΈ° μν΄ μ¬μ© | |
| # video μ λν face landmark npz νμΌμ΄ μκ³ μμ νμ₯μ avi μΈ κ²½μ° dlib μΌλ‘ μ§μ npz νμΌ μμ± | |
| if not os.path.exists(landmarks_pathname) and video_pathname.split('.')[-1] == 'mp4': | |
| # dlib μ¬μ©ν΄μ face landmark μ°ΎκΈ° | |
| def get_face_landmark(img): | |
| detector_hog = dlib.get_frontal_face_detector() | |
| dlib_rects = detector_hog(img, 1) | |
| model_path = os.path.dirname(os.path.abspath(__file__)) + '/shape_predictor_68_face_landmarks.dat' | |
| landmark_predictor = dlib.shape_predictor(model_path) | |
| # dlib μΌλ‘ face landmark μ°ΎκΈ° | |
| list_landmarks = [] | |
| for dlib_rect in dlib_rects: | |
| points = landmark_predictor(img, dlib_rect) | |
| list_points = list(map(lambda p: (p.x, p.y), points.parts())) | |
| list_landmarks.append(list_points) | |
| input_width, input_height = img.shape | |
| output_width, output_height = (256, 256) | |
| width_rate = input_width / output_width | |
| height_rate = input_height / output_height | |
| img_rate = [(width_rate, height_rate)]*68 | |
| face_rate = np.array(img_rate) | |
| eye_rate = np.array(img_rate[36:48]) | |
| # face landmark list κ° λΉμ΄μμ§ μμ κ²½μ° | |
| if list_landmarks: | |
| for dlib_rect, landmark in zip(dlib_rects, list_landmarks): | |
| face_landmark = np.array(landmark) # face landmark | |
| eye_landmark = np.array(landmark[36:48]) # eye landmark | |
| return face_landmark, eye_landmark | |
| # face landmark list κ° λΉμ΄μλ κ²½μ° | |
| else: | |
| landmark = [(0.0, 0.0)] * 68 | |
| face_landmark = np.array(landmark) # face landmark | |
| eye_landmark = np.array(landmark[36:48]) # eye landmark | |
| return face_landmark, eye_landmark | |
| target_frames = 29 # μνλ νλ μ κ°μ | |
| video = videoToArray(video_pathname, is_gray=args.convert_gray) # μμ μ 보 μμ μμ νλ μ κ°μλ₯Ό μΆκ°ν numpy | |
| output_video = frameAdjust(video, target_frames) # frame sampling (νλ μ κ°μ λ§μΆκΈ°) | |
| multi_sub_landmarks = [] | |
| person_landmarks = [] | |
| frame_landmarks = [] | |
| for frame_idx, frame in enumerate(get_yield(output_video)): | |
| print(f'\n ------------frame {frame_idx}------------ ') | |
| facial_landmarks, eye_landmarks = get_face_landmark(frame) # dlib μ¬μ©ν΄μ face landmark μ°ΎκΈ° | |
| person_landmarks = { | |
| 'id': 0, | |
| 'most_recent_fitting_scores': np.array([2.0,2.0,2.0]), | |
| 'facial_landmarks': facial_landmarks, | |
| 'roll': 7, | |
| 'yaw': 3.5, | |
| 'eye_landmarks': eye_landmarks, | |
| 'fitting_scores_updated': True, | |
| 'pitch': -0.05 | |
| } | |
| frame_landmarks.append(person_landmarks) | |
| multi_sub_landmarks.append(np.array(frame_landmarks.copy(), dtype=object)) | |
| multi_sub_landmarks = np.array(multi_sub_landmarks) # list to numpy | |
| save2npz(landmarks_pathname, data=multi_sub_landmarks) # face landmark npz μ μ₯ | |
| print('\n ------------ save npz ------------ \n') | |
| # video μ λν face landmark npz νμΌμ΄ μλ κ²½μ° | |
| else: | |
| # νμΌμ΄ μλμ§ νμΈ, μμΌλ©΄ AssertionError λ©μμ§λ₯Ό λμ | |
| assert os.path.isfile(landmarks_pathname), "File does not exist. Path input: {}".format(landmarks_pathname) # μνλ 쑰건μ λ³μκ°μ 보μ¦νκΈ° μν΄ μ¬μ© | |
| # νμΌμ΄ μ‘΄μ¬ν κ²½μ° | |
| if os.path.exists(dst_pathname): | |
| continue # μ½λ μ€ν 건λλ°κΈ° | |
| multi_sub_landmarks = np.load( landmarks_pathname, allow_pickle=True)['data'] # numpy νμΌ μ΄κΈ° | |
| landmarks = [None] * len( multi_sub_landmarks) # λλλ§ν¬ λ³μ μ΄κΈ°ν | |
| for frame_idx in range(len(landmarks)): | |
| try: | |
| landmarks[frame_idx] = multi_sub_landmarks[frame_idx][int(person_id)]['facial_landmarks'].astype(np.float64) # νλ μ μΈλ±μ€ λ²νΈμμ μ¬λidμ μΌκ΅΄ λλλ§ν¬ μ 보 κ°μ Έμ€κΈ° | |
| except IndexError: # ν΄λΉ μΈλ±μ€ λ²νΈμ κΉμ΄ μμΌλ©΄ IndexError λ°μ | |
| continue # μ½λ μ€ν 건λλ°κΈ° | |
| # face landmark κ° [(0,0)]*68 μ΄ μλλ©΄ λλλ§ν¬ λ³΄κ° ν npz νμΌ μμ± | |
| landmarks_empty_list = [] | |
| landmarks_empty = [(0, 0)]*68 | |
| landmarks_empty = np.array(landmarks_empty, dtype=object) | |
| for i in range(len(landmarks_empty)): | |
| landmarks_empty_list.append(landmarks_empty.copy()) | |
| condition = landmarks != landmarks_empty_list | |
| if condition: | |
| # -- pre-process landmarks: interpolate frames not being detected. | |
| preprocessed_landmarks = landmarks_interpolate(landmarks) # λλλ§ν¬ λ³΄κ° | |
| # λ³μκ° λΉμ΄μμ§ μλ€λ©΄ | |
| if not preprocessed_landmarks: | |
| continue # μ½λ μ€ν 건λλ°κΈ° | |
| # -- crop | |
| sequence = crop_patch(video_pathname, preprocessed_landmarks) # μμμμ λλλ§ν¬ λ°μμ μ μ μλΌλ΄κΈ° | |
| # sequenceκ° λΉμ΄μλμ§ νμΈ, λΉμ΄μμΌλ©΄ AssertionError λ©μμ§λ₯Ό λμ | |
| assert sequence is not None, "cannot crop from {}.".format(filename) # μνλ 쑰건μ λ³μκ°μ 보μ¦νκΈ° μν΄ μ¬μ© | |
| # -- save | |
| data = convert_bgr2gray(sequence) if args.convert_gray else sequence[...,::-1] # gray λ³ν | |
| save2npz(dst_pathname, data=data) # λ°μ΄ν°λ₯Ό npz νμμΌλ‘ μ μ₯ | |
| print('Done.') |