File size: 11,716 Bytes
e7334c8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 |
import numpy as np
from tqdm import tqdm
import cv2, imageio, ffmpeg, os, time, shutil
def VidInfo(vid_path):
'''
returns a dictonary of 'duration', 'fps', 'frame_count', 'frame_height', 'frame_width',
'format', 'fourcc'
'''
vcap = cv2.VideoCapture(vid_path)
if not vcap.isOpened():
# cannot read video
if vid_path.startswith('https://'):
# likely a ffmpeg without open-ssl support issue
# https://github.com/opencv/opencv-python/issues/204
return VidInfo(vid_path.replace('https://','http://'))
else:
return None
info_dict = {
'fps' : round(vcap.get(cv2.CAP_PROP_FPS),2), #int(vcap.get(cv2.CAP_PROP_FPS)),
'frame_count': int(vcap.get(cv2.CAP_PROP_FRAME_COUNT)), # number of frames should integars
'duration': round(
int(vcap.get(cv2.CAP_PROP_FRAME_COUNT)) / vcap.get(cv2.CAP_PROP_FPS),
2), # round number of seconds to 2 decimals
'frame_height': vcap.get(cv2.CAP_PROP_FRAME_HEIGHT),
'frame_width': vcap.get(cv2.CAP_PROP_FRAME_WIDTH),
'format': vcap.get(cv2.CAP_PROP_FORMAT),
'fourcc': vcap.get(cv2.CAP_PROP_FOURCC)
}
vcap.release()
return info_dict
def VidReader(vid_path, verbose = False, use_imageio = True):
'''
given a video file path, returns a list of images
Args:
vid_path: a MP4 file path
use_imageio: if true, function returns a ImageIO reader object (RGB);
otherwise, a list of CV2 array will be returned
'''
if use_imageio:
vid = imageio.get_reader(vid_path, 'ffmpeg')
return vid
vcap = cv2.VideoCapture(vid_path)
s_time = time.time()
# try to determine the total number of frames in Vid
frame_count = int(vcap.get(cv2.CAP_PROP_FRAME_COUNT))
frame_rate = int(vcap.get(cv2.CAP_PROP_FPS))
if verbose:
print(f'\t{frame_count} total frames in video {vid_path}')
print(f'\t\t FPS: {frame_rate}')
print(f'\t\t Video Duration: {frame_count/ frame_rate}s')
# loop over frames
results = []
for i in tqdm(range(frame_count)):
grabbed, frame = vcap.read()
if grabbed:
results.append(frame)
# Output
r_time = "{:.2f}".format(time.time() - s_time)
if verbose:
print(f'\t{vid_path} loaded in {r_time} ({frame_count/float(r_time)} fps)')
vcap.release()
return results
def get_vid_frame(n, vid_path):
'''
return frame(s) in np.array specified by i
Args:
n: list of int
'''
vreader = VidReader(vid_path, verbose = False, use_imageio = True)
fcount = VidInfo(vid_path)['frame_count']
if type(n) == list:
return [vreader.get_data(i) if i in range(fcount) else None for i in n]
elif type(n) == int:
return vreader.get_data(n) if n in range(fcount) else None
else:
raise ValueError(f'n must be either int or list, {type(n)} detected.')
def vid_slicer(vid_path, output_path, start_frame, end_frame, keep_audio = False, overwrite = False):
'''
ref https://github.com/kkroening/ffmpeg-python/issues/184#issuecomment-493847192
'''
if not( os.path.isdir(os.path.dirname(output_path))):
raise ValueError(f'output_path directory does not exists: {os.path.dirname(output_path)}')
if os.path.isfile(output_path) and not overwrite:
warnings.warn(f'{output_path} already exists but overwrite switch is False, nothing done.')
return None
input_vid = ffmpeg.input(vid_path)
vid_info = VidInfo(vid_path)
end_frame += 1
if keep_audio:
vid = (
input_vid
.trim(start_frame = start_frame, end_frame = end_frame)
.setpts('PTS-STARTPTS')
)
aud = (
input_vid
.filter_('atrim', start = start_frame / vid_info['fps'], end = end_frame / vid_info['fps'])
.filter_('asetpts', 'PTS-STARTPTS')
)
joined = ffmpeg.concat(vid, aud, v = 1, a =1).node
output = ffmpeg.output(joined[0], joined[1], f'{output_path}').overwrite_output()
output.run()
else:
(
input_vid
.trim (start_frame = start_frame, end_frame = end_frame )
.setpts ('PTS-STARTPTS')
.output (f'{output_path}')
.overwrite_output()
.run()
)
return output_path
def vid_resize(vid_path, output_path, width, overwrite = False):
'''
use ffmpeg to resize the input video to the width given, keeping aspect ratio
'''
if not( os.path.isdir(os.path.dirname(output_path))):
raise ValueError(f'output_path directory does not exists: {os.path.dirname(output_path)}')
if os.path.isfile(output_path) and not overwrite:
warnings.warn(f'{output_path} already exists but overwrite switch is False, nothing done.')
return None
input_vid = ffmpeg.input(vid_path)
vid = (
input_vid
.filter('scale', width, -1)
.output(output_path)
.overwrite_output()
.run()
)
return output_path
def vid_reduce_framerate(vid_path, output_path, new_fps, overwrite = False):
'''
use ffmpeg to resize the input video to the width given, keeping aspect ratio
'''
if not( os.path.isdir(os.path.dirname(output_path))):
raise ValueError(f'output_path directory does not exists: {os.path.dirname(output_path)}')
if os.path.isfile(output_path) and not overwrite:
warnings.warn(f'{output_path} already exists but overwrite switch is False, nothing done.')
return None
input_vid = ffmpeg.input(vid_path)
vid = (
input_vid
.filter('fps', fps = new_fps, round = 'up')
.output(output_path)
.overwrite_output()
.run()
)
return output_path
def seek_frame_count(VidReader, cv2_frame_count, guess_within = 0.1,
seek_rate = 1, bDebug = False):
'''
imageio/ffmpeg frame count could be different than cv2. this function
returns the true frame count in the given vid reader. Returns None if frame
count can't be determined
Args:
VidReader: ImageIO video reader object with method .get_data()
cv2_frame_count: frame count from cv2
guess_within: look for actual frame count within X% of cv2_frame_count
'''
max_guess = int(cv2_frame_count * (1-guess_within))
seek_rate = max(seek_rate, 1)
pbar = reversed(range(max_guess, cv2_frame_count, seek_rate))
if bDebug:
pbar = tqdm(pbar, desc = f'seeking frame')
print(f'seeking from {max_guess} to {cv2_frame_count} with seek_rate of {seek_rate}')
for i in pbar:
try:
im = VidReader.get_data(i)
except IndexError:
if bDebug:
print(f'{i} not found.')
continue
# Frame Found
if i+1 == cv2_frame_count:
print(f'seek_frame_count: found frame count at {i+1}')
return i + 1
else:
return seek_frame_count(VidReader, cv2_frame_count = i + seek_rate,
guess_within= seek_rate / (i + seek_rate),
seek_rate= int(seek_rate/2),
bDebug = bDebug)
return None
def VidWriter(lFrames, output_path, strFourcc = 'MP4V', verbose = False, intFPS = 20, crf = None,
use_imageio = False):
'''
Given a list of images in numpy array format, it outputs a MP4 file
Args:
lFrames: list of numpy arrays or filename
output_path: a MP4 file path
strFourcc: four letter video codec; XVID is more preferable. MJPG results in high size video. X264 gives very small size video; see https://opencv-python-tutroals.readthedocs.io/en/latest/py_tutorials/py_gui/py_video_display/py_video_display.html
crf: Constant Rate Factor for ffmpeg video compression
'''
s_time = time.time()
if not output_path.endswith('.mp4'):
raise ValueError(f'VidWriter: only mp4 video output supported.')
if crf:
crf = int(crf)
if crf > 24 or crf < 18:
raise ValueError(f'VidWriter: crf must be between 18 and 24')
if not os.path.exists(os.path.dirname(output_path)):
output_dir = os.path.dirname(output_path)
print(f'\t{output_dir} does not exist.\n\tCreating video file output directory: {output_dir}')
os.makedirs(output_dir)
if use_imageio:
writer = imageio.get_writer(output_path, fps = intFPS)
for frame in tqdm(lFrames, desc = "Writing video using ImageIO"):
if not type(frame) == np.ndarray:
# read from filename
if not os.path.isfile(frame):
raise ValueError(f'VidWriter: lFrames must be list of images (np.array) or filenames')
frame = imageio.imread(frame)
writer.append_data(frame)
writer.close()
else:
#init OpenCV Vid Writer:
H , W = lFrames[0].shape[:2]
#fourcc = cv2.VideoWriter_fourcc(*'MP4V')
fourcc = cv2.VideoWriter_fourcc(*strFourcc)
if verbose:
print(f'\tEncoding using fourcc: {strFourcc}')
writer = cv2.VideoWriter(output_path, fourcc, fps = intFPS, frameSize = (W, H), isColor = True)
for frame in tqdm(lFrames, desc = "Writing video using OpenCV"):
writer.write(frame)
writer.release()
# Output
r_time = "{:.2f}".format( max(time.time() - s_time, 0.01))
if verbose:
print(f'\t{output_path} written in {r_time} ({len(lFrames)/float(r_time)} fps)')
if crf:
if verbose:
print(f'\tCompressing {output_path} with FFmpeg using crf: {crf}')
isCompressed = VidCompress(output_path, crf = crf, use_ffmpy = False)
if verbose:
print(f'\tCompressed: {isCompressed}')
return output_path
def im_dir_to_video(im_dir, output_path, fps, tup_im_extension = ('.jpg'),
max_long_edge = 600, filename_len = 6, pixel_format = 'yuv420p',
tqdm_func = tqdm):
'''turn a directory of images into video using ffmpeg
ref: https://github.com/kkroening/ffmpeg-python/issues/95#issuecomment-401428324
Args:
pixel_format: for list of supported formats see https://en.wikipedia.org/wiki/FFmpeg#Pixel_formats
filename_len: ensure frame number are zero padded; 0 will skip this step
'''
if filename_len:
# Ensure Filenames are Zero padded
l_im_fp = [f for f in os.listdir(im_dir) if f.endswith(tup_im_extension)]
l_im_fp = sorted(l_im_fp, key = lambda f: int(f.split('.')[0]))
for f in tqdm_func(l_im_fp, desc = 'ensuring image filenames are zero padded'):
fname, fext = os.path.splitext(f)
padded_f = fname.zfill(filename_len) + fext
if not os.path.isfile(os.path.join(im_dir,padded_f)):
shutil.move(os.path.join(im_dir, f), os.path.join(im_dir, padded_f))
# removed symlink to f as it will duplicate the frames in video generation
# os.symlink(src = os.path.join(im_dir, padded_f), dst = os.path.join(im_dir, f))
#TODO: ensure image size are divisible by 2
im_dir += '' if im_dir.endswith('/') else '/'
im_stream_string = f'{im_dir}*.jpg'
# we need to escape special characters
im_stream_string = im_stream_string.translate(
str.maketrans(
{'[': r'\[',
']': r'\]'})
)
r = (
ffmpeg
.input(im_stream_string, pattern_type = 'glob', framerate=fps)
.filter('format', pixel_format)
# .filter('pad', 'ceil(iw/2)*2:ceil(ih/2)*2')
.output(output_path)
.run()
)
return output_path
#
# def VidCompress(input_path, output_path = None, crf = 24, use_ffmpy = False):
# '''
# Compress input_path video (mp4 only) using ffmpy
# crf: Constant Rate Factor for ffmpeg video compression, must be between 18 and 24
# use_ffmpy: use ffmpy instead of commandline call to ffmpeg
# '''
# if not input_path.endswith('.mp4'):
# print(f'\tFATAL: only mp4 videos supported.')
# return None
#
# output_fname = output_path if output_path else input_path
# tmp_fname = input_path.replace(".mp4","_tmp.mp4")
# os.rename(input_path, tmp_fname)
#
# try:
# if not use_ffmpy:
# #os.popen(f'ffmpeg -i {tmp_fname} -vcodec libx264 -crf {crf} {output_fname}')
#
# cmdOut = subprocess.Popen(['ffmpeg', '-i', tmp_fname, '-vcodec', 'libx264', '-crf', str(crf), output_fname],
# stdout = subprocess.PIPE,
# stderr = subprocess.STDOUT)
# stdout, stderr = cmdOut.communicate()
# if not stderr:
# os.remove(tmp_fname)
# return True
# else:
# return False
# else:
# ff = FFmpeg(
# inputs = {tmp_fname : None},
# outputs = {output_fname : f'-vcodec libx264 -crf {crf}'}
# )
# ff.run()
#
# os.remove(tmp_fname)
# return True
#
# except OSError as e:
# print(f'\tWARNING: Compression Failed; OSError\n\tLikely out of RAM\n\tError Msg: {e}')
# os.rename(tmp_fname, output_fname)
# return False
|