sam2 / toolbox /vid_utils.py
John Ho
init comit
e7334c8
raw
history blame
11.7 kB
import numpy as np
from tqdm import tqdm
import cv2, imageio, ffmpeg, os, time, shutil
def VidInfo(vid_path):
'''
returns a dictonary of 'duration', 'fps', 'frame_count', 'frame_height', 'frame_width',
'format', 'fourcc'
'''
vcap = cv2.VideoCapture(vid_path)
if not vcap.isOpened():
# cannot read video
if vid_path.startswith('https://'):
# likely a ffmpeg without open-ssl support issue
# https://github.com/opencv/opencv-python/issues/204
return VidInfo(vid_path.replace('https://','http://'))
else:
return None
info_dict = {
'fps' : round(vcap.get(cv2.CAP_PROP_FPS),2), #int(vcap.get(cv2.CAP_PROP_FPS)),
'frame_count': int(vcap.get(cv2.CAP_PROP_FRAME_COUNT)), # number of frames should integars
'duration': round(
int(vcap.get(cv2.CAP_PROP_FRAME_COUNT)) / vcap.get(cv2.CAP_PROP_FPS),
2), # round number of seconds to 2 decimals
'frame_height': vcap.get(cv2.CAP_PROP_FRAME_HEIGHT),
'frame_width': vcap.get(cv2.CAP_PROP_FRAME_WIDTH),
'format': vcap.get(cv2.CAP_PROP_FORMAT),
'fourcc': vcap.get(cv2.CAP_PROP_FOURCC)
}
vcap.release()
return info_dict
def VidReader(vid_path, verbose = False, use_imageio = True):
'''
given a video file path, returns a list of images
Args:
vid_path: a MP4 file path
use_imageio: if true, function returns a ImageIO reader object (RGB);
otherwise, a list of CV2 array will be returned
'''
if use_imageio:
vid = imageio.get_reader(vid_path, 'ffmpeg')
return vid
vcap = cv2.VideoCapture(vid_path)
s_time = time.time()
# try to determine the total number of frames in Vid
frame_count = int(vcap.get(cv2.CAP_PROP_FRAME_COUNT))
frame_rate = int(vcap.get(cv2.CAP_PROP_FPS))
if verbose:
print(f'\t{frame_count} total frames in video {vid_path}')
print(f'\t\t FPS: {frame_rate}')
print(f'\t\t Video Duration: {frame_count/ frame_rate}s')
# loop over frames
results = []
for i in tqdm(range(frame_count)):
grabbed, frame = vcap.read()
if grabbed:
results.append(frame)
# Output
r_time = "{:.2f}".format(time.time() - s_time)
if verbose:
print(f'\t{vid_path} loaded in {r_time} ({frame_count/float(r_time)} fps)')
vcap.release()
return results
def get_vid_frame(n, vid_path):
'''
return frame(s) in np.array specified by i
Args:
n: list of int
'''
vreader = VidReader(vid_path, verbose = False, use_imageio = True)
fcount = VidInfo(vid_path)['frame_count']
if type(n) == list:
return [vreader.get_data(i) if i in range(fcount) else None for i in n]
elif type(n) == int:
return vreader.get_data(n) if n in range(fcount) else None
else:
raise ValueError(f'n must be either int or list, {type(n)} detected.')
def vid_slicer(vid_path, output_path, start_frame, end_frame, keep_audio = False, overwrite = False):
'''
ref https://github.com/kkroening/ffmpeg-python/issues/184#issuecomment-493847192
'''
if not( os.path.isdir(os.path.dirname(output_path))):
raise ValueError(f'output_path directory does not exists: {os.path.dirname(output_path)}')
if os.path.isfile(output_path) and not overwrite:
warnings.warn(f'{output_path} already exists but overwrite switch is False, nothing done.')
return None
input_vid = ffmpeg.input(vid_path)
vid_info = VidInfo(vid_path)
end_frame += 1
if keep_audio:
vid = (
input_vid
.trim(start_frame = start_frame, end_frame = end_frame)
.setpts('PTS-STARTPTS')
)
aud = (
input_vid
.filter_('atrim', start = start_frame / vid_info['fps'], end = end_frame / vid_info['fps'])
.filter_('asetpts', 'PTS-STARTPTS')
)
joined = ffmpeg.concat(vid, aud, v = 1, a =1).node
output = ffmpeg.output(joined[0], joined[1], f'{output_path}').overwrite_output()
output.run()
else:
(
input_vid
.trim (start_frame = start_frame, end_frame = end_frame )
.setpts ('PTS-STARTPTS')
.output (f'{output_path}')
.overwrite_output()
.run()
)
return output_path
def vid_resize(vid_path, output_path, width, overwrite = False):
'''
use ffmpeg to resize the input video to the width given, keeping aspect ratio
'''
if not( os.path.isdir(os.path.dirname(output_path))):
raise ValueError(f'output_path directory does not exists: {os.path.dirname(output_path)}')
if os.path.isfile(output_path) and not overwrite:
warnings.warn(f'{output_path} already exists but overwrite switch is False, nothing done.')
return None
input_vid = ffmpeg.input(vid_path)
vid = (
input_vid
.filter('scale', width, -1)
.output(output_path)
.overwrite_output()
.run()
)
return output_path
def vid_reduce_framerate(vid_path, output_path, new_fps, overwrite = False):
'''
use ffmpeg to resize the input video to the width given, keeping aspect ratio
'''
if not( os.path.isdir(os.path.dirname(output_path))):
raise ValueError(f'output_path directory does not exists: {os.path.dirname(output_path)}')
if os.path.isfile(output_path) and not overwrite:
warnings.warn(f'{output_path} already exists but overwrite switch is False, nothing done.')
return None
input_vid = ffmpeg.input(vid_path)
vid = (
input_vid
.filter('fps', fps = new_fps, round = 'up')
.output(output_path)
.overwrite_output()
.run()
)
return output_path
def seek_frame_count(VidReader, cv2_frame_count, guess_within = 0.1,
seek_rate = 1, bDebug = False):
'''
imageio/ffmpeg frame count could be different than cv2. this function
returns the true frame count in the given vid reader. Returns None if frame
count can't be determined
Args:
VidReader: ImageIO video reader object with method .get_data()
cv2_frame_count: frame count from cv2
guess_within: look for actual frame count within X% of cv2_frame_count
'''
max_guess = int(cv2_frame_count * (1-guess_within))
seek_rate = max(seek_rate, 1)
pbar = reversed(range(max_guess, cv2_frame_count, seek_rate))
if bDebug:
pbar = tqdm(pbar, desc = f'seeking frame')
print(f'seeking from {max_guess} to {cv2_frame_count} with seek_rate of {seek_rate}')
for i in pbar:
try:
im = VidReader.get_data(i)
except IndexError:
if bDebug:
print(f'{i} not found.')
continue
# Frame Found
if i+1 == cv2_frame_count:
print(f'seek_frame_count: found frame count at {i+1}')
return i + 1
else:
return seek_frame_count(VidReader, cv2_frame_count = i + seek_rate,
guess_within= seek_rate / (i + seek_rate),
seek_rate= int(seek_rate/2),
bDebug = bDebug)
return None
def VidWriter(lFrames, output_path, strFourcc = 'MP4V', verbose = False, intFPS = 20, crf = None,
use_imageio = False):
'''
Given a list of images in numpy array format, it outputs a MP4 file
Args:
lFrames: list of numpy arrays or filename
output_path: a MP4 file path
strFourcc: four letter video codec; XVID is more preferable. MJPG results in high size video. X264 gives very small size video; see https://opencv-python-tutroals.readthedocs.io/en/latest/py_tutorials/py_gui/py_video_display/py_video_display.html
crf: Constant Rate Factor for ffmpeg video compression
'''
s_time = time.time()
if not output_path.endswith('.mp4'):
raise ValueError(f'VidWriter: only mp4 video output supported.')
if crf:
crf = int(crf)
if crf > 24 or crf < 18:
raise ValueError(f'VidWriter: crf must be between 18 and 24')
if not os.path.exists(os.path.dirname(output_path)):
output_dir = os.path.dirname(output_path)
print(f'\t{output_dir} does not exist.\n\tCreating video file output directory: {output_dir}')
os.makedirs(output_dir)
if use_imageio:
writer = imageio.get_writer(output_path, fps = intFPS)
for frame in tqdm(lFrames, desc = "Writing video using ImageIO"):
if not type(frame) == np.ndarray:
# read from filename
if not os.path.isfile(frame):
raise ValueError(f'VidWriter: lFrames must be list of images (np.array) or filenames')
frame = imageio.imread(frame)
writer.append_data(frame)
writer.close()
else:
#init OpenCV Vid Writer:
H , W = lFrames[0].shape[:2]
#fourcc = cv2.VideoWriter_fourcc(*'MP4V')
fourcc = cv2.VideoWriter_fourcc(*strFourcc)
if verbose:
print(f'\tEncoding using fourcc: {strFourcc}')
writer = cv2.VideoWriter(output_path, fourcc, fps = intFPS, frameSize = (W, H), isColor = True)
for frame in tqdm(lFrames, desc = "Writing video using OpenCV"):
writer.write(frame)
writer.release()
# Output
r_time = "{:.2f}".format( max(time.time() - s_time, 0.01))
if verbose:
print(f'\t{output_path} written in {r_time} ({len(lFrames)/float(r_time)} fps)')
if crf:
if verbose:
print(f'\tCompressing {output_path} with FFmpeg using crf: {crf}')
isCompressed = VidCompress(output_path, crf = crf, use_ffmpy = False)
if verbose:
print(f'\tCompressed: {isCompressed}')
return output_path
def im_dir_to_video(im_dir, output_path, fps, tup_im_extension = ('.jpg'),
max_long_edge = 600, filename_len = 6, pixel_format = 'yuv420p',
tqdm_func = tqdm):
'''turn a directory of images into video using ffmpeg
ref: https://github.com/kkroening/ffmpeg-python/issues/95#issuecomment-401428324
Args:
pixel_format: for list of supported formats see https://en.wikipedia.org/wiki/FFmpeg#Pixel_formats
filename_len: ensure frame number are zero padded; 0 will skip this step
'''
if filename_len:
# Ensure Filenames are Zero padded
l_im_fp = [f for f in os.listdir(im_dir) if f.endswith(tup_im_extension)]
l_im_fp = sorted(l_im_fp, key = lambda f: int(f.split('.')[0]))
for f in tqdm_func(l_im_fp, desc = 'ensuring image filenames are zero padded'):
fname, fext = os.path.splitext(f)
padded_f = fname.zfill(filename_len) + fext
if not os.path.isfile(os.path.join(im_dir,padded_f)):
shutil.move(os.path.join(im_dir, f), os.path.join(im_dir, padded_f))
# removed symlink to f as it will duplicate the frames in video generation
# os.symlink(src = os.path.join(im_dir, padded_f), dst = os.path.join(im_dir, f))
#TODO: ensure image size are divisible by 2
im_dir += '' if im_dir.endswith('/') else '/'
im_stream_string = f'{im_dir}*.jpg'
# we need to escape special characters
im_stream_string = im_stream_string.translate(
str.maketrans(
{'[': r'\[',
']': r'\]'})
)
r = (
ffmpeg
.input(im_stream_string, pattern_type = 'glob', framerate=fps)
.filter('format', pixel_format)
# .filter('pad', 'ceil(iw/2)*2:ceil(ih/2)*2')
.output(output_path)
.run()
)
return output_path
#
# def VidCompress(input_path, output_path = None, crf = 24, use_ffmpy = False):
# '''
# Compress input_path video (mp4 only) using ffmpy
# crf: Constant Rate Factor for ffmpeg video compression, must be between 18 and 24
# use_ffmpy: use ffmpy instead of commandline call to ffmpeg
# '''
# if not input_path.endswith('.mp4'):
# print(f'\tFATAL: only mp4 videos supported.')
# return None
#
# output_fname = output_path if output_path else input_path
# tmp_fname = input_path.replace(".mp4","_tmp.mp4")
# os.rename(input_path, tmp_fname)
#
# try:
# if not use_ffmpy:
# #os.popen(f'ffmpeg -i {tmp_fname} -vcodec libx264 -crf {crf} {output_fname}')
#
# cmdOut = subprocess.Popen(['ffmpeg', '-i', tmp_fname, '-vcodec', 'libx264', '-crf', str(crf), output_fname],
# stdout = subprocess.PIPE,
# stderr = subprocess.STDOUT)
# stdout, stderr = cmdOut.communicate()
# if not stderr:
# os.remove(tmp_fname)
# return True
# else:
# return False
# else:
# ff = FFmpeg(
# inputs = {tmp_fname : None},
# outputs = {output_fname : f'-vcodec libx264 -crf {crf}'}
# )
# ff.run()
#
# os.remove(tmp_fname)
# return True
#
# except OSError as e:
# print(f'\tWARNING: Compression Failed; OSError\n\tLikely out of RAM\n\tError Msg: {e}')
# os.rename(tmp_fname, output_fname)
# return False