import os |
import cv2 |
import numpy as np |
from moviepy.editor import VideoFileClip, CompositeVideoClip, ColorClip |
from tqdm import tqdm |
import glob |
import concurrent.futures |
import time |
import random |
def create_text_overlay(text, subtitle, width, height, start_time, duration): |
overlay = np.zeros((height, width, 4), dtype=np.uint8) |
box_width = int(width * 0.75) |
box_x_start = (width - box_width) // 2 |
cv2.rectangle(overlay, |
(box_x_start, height//3), |
(box_x_start + box_width, 2*height//3), |
(0,0,0,180), -1) |
if "AirLetters" in text: |
title_scale = 3.0 |
subtitle_scale = 1.5 |
envelope_emoji = cv2.imread("emoji/envelope.png", cv2.IMREAD_UNCHANGED) |
wind_emoji = cv2.imread("emoji/wind.png", cv2.IMREAD_UNCHANGED) |
target_height = int(title_scale * 30) |
envelope_aspect = envelope_emoji.shape[1] / envelope_emoji.shape[0] |
wind_aspect = wind_emoji.shape[1] / wind_emoji.shape[0] |
envelope_emoji = cv2.resize(envelope_emoji, |
(int(target_height * envelope_aspect), target_height)) |
wind_emoji = cv2.resize(wind_emoji, |
(int(target_height * wind_aspect), target_height)) |
else: |
title_scale = 2.0 |
subtitle_scale = 1.0 |
title_color = (138, 223, 178, 255) |
subtitle_color = (255, 255, 255, 255) |
title_size = cv2.getTextSize(text, font, title_scale, 2)[0] |
title_x = box_x_start + (box_width - title_size[0]) // 2 |
title_y = height // 2 |
if "AirLetters" in text: |
emoji_y = title_y - target_height + 5 |
envelope_x = title_x - envelope_emoji.shape[1] - 20 |
wind_x = title_x + title_size[0] + 20 |
def overlay_image_with_alpha(background, foreground, x, y): |
if x >= background.shape[1] or y >= background.shape[0]: |
return |
h, w = foreground.shape[:2] |
if len(foreground.shape) == 2: |
alpha = foreground |
foreground = cv2.cvtColor(foreground, cv2.COLOR_GRAY2BGR) |
else: |
alpha = foreground[:, :, 3] / 255.0 |
foreground = foreground[:, :, :3] |
y1, y2 = max(0, y), min(background.shape[0], y + h) |
x1, x2 = max(0, x), min(background.shape[1], x + w) |
alpha_slice = alpha[y1-y:y2-y, x1-x:x2-x] |
alpha_expanded = np.expand_dims(alpha_slice, axis=-1) |
background_slice = background[y1:y2, x1:x2, :3] |
foreground_slice = foreground[y1-y:y2-y, x1-x:x2-x] |
background[y1:y2, x1:x2, :3] = background_slice * (1 - alpha_expanded) + foreground_slice * alpha_expanded |
background[y1:y2, x1:x2, 3] = background[y1:y2, x1:x2, 3] * (1 - alpha_slice) + 255 * alpha_slice |
overlay_image_with_alpha(overlay, envelope_emoji, envelope_x, emoji_y) |
overlay_image_with_alpha(overlay, wind_emoji, wind_x, emoji_y) |
else: |
if len(subtitle) > 50: |
words = subtitle.split() |
mid = len(words) // 2 |
subtitle = " ".join(words[:mid]) + "\n" + " ".join(words[mid:]) |
title_size = cv2.getTextSize(text, font, title_scale, 2)[0] |
title_x = box_x_start + (box_width - title_size[0]) // 2 |
title_y = height // 2 |
cv2.putText(overlay, text, (title_x, title_y), font, title_scale, title_color, 2) |
if "\n" in subtitle: |
subtitle_lines = subtitle.split("\n") |
subtitle_y = title_y + 50 |
for line in subtitle_lines: |
subtitle_size = cv2.getTextSize(line, font, subtitle_scale, 2)[0] |
subtitle_x = box_x_start + (box_width - subtitle_size[0]) // 2 |
cv2.putText(overlay, line, (subtitle_x, subtitle_y), font, subtitle_scale, subtitle_color, 2) |
subtitle_y += 50 |
else: |
subtitle_size = cv2.getTextSize(subtitle, font, subtitle_scale, 2)[0] |
subtitle_x = box_x_start + (box_width - subtitle_size[0]) // 2 |
cv2.putText(overlay, subtitle, (subtitle_x, title_y + 60), font, subtitle_scale, subtitle_color, 2) |
overlay_clip = ColorClip(size=(width, height), color=[0,0,0,0]) |
overlay_clip.mask = ColorClip(size=(width, height), color=[1,1,1,1]) |
overlay_clip.mask.get_frame = lambda t: overlay[:,:,3:4] / 255.0 |
overlay_clip.get_frame = lambda t: overlay[:,:,:3] |
overlay_clip = overlay_clip.set_start(start_time) |
overlay_clip = overlay_clip.set_duration(duration) |
overlay_clip = overlay_clip.fadein(0.5).fadeout(0.5) |
return overlay_clip |
def load_video(args): |
video_path, target_size, padding, idx, grid_width = args |
try: |
clip = VideoFileClip(video_path, audio=False) |
clip = clip.resize(height=target_size) |
clip = clip.crop(x1=(clip.w - target_size)//2, x2=(clip.w + target_size)//2) if clip.w > target_size else clip |
clip = clip.loop() |
bg = ColorClip(size=(target_size + padding*2, target_size + padding*2), color=(255,255,255)) |
clip = clip.set_position((padding, padding)) |
clip = CompositeVideoClip([bg, clip]) |
x = (idx % grid_width) * (target_size + padding*2) |
y = (idx // grid_width) * (target_size + padding*2) |
clip = clip.set_position((x, y)) |
return clip |
except Exception as e: |
print(f"\nError processing {video_path}: {str(e)}") |
return None |
def create_montage(video_dir, output_path, width=1920, height=1080, fps=30): |
print("Starting video creation...") |
start_time = time.time() |
video_paths = glob.glob(os.path.join(video_dir, "*.mp4")) |
base_grid_videos = 400 |
aspect_ratio = 16/9 |
grid_width = int(np.sqrt(base_grid_videos * aspect_ratio)) |
grid_height = int(np.sqrt(base_grid_videos / aspect_ratio)) |
padding = 1 |
target_size = min(width // grid_width, height // grid_height) - padding*2 |
print(f"Creating grid of {grid_width}x{grid_height} videos") |
print(f"Video size: {target_size}x{target_size} pixels") |
needed_videos = grid_width * grid_height |
if len(video_paths) > needed_videos: |
video_paths = random.sample(video_paths, needed_videos) |
args_list = [(path, target_size, padding, idx, grid_width) |
for idx, path in enumerate(video_paths)] |
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: |
futures = list(tqdm( |
executor.map(load_video, args_list), |
total=len(args_list), |
desc="Loading videos" |
)) |
clips = [clip for clip in futures if clip is not None] |
if not clips: |
raise ValueError("No videos were successfully loaded!") |
bg = ColorClip((width, height), color=(0, 0, 0)) |
video_clips = [bg] + clips |
print("Creating video composition...") |
video_comp = CompositeVideoClip(video_clips, size=(width, height)) |
w, h = video_comp.size |
def get_zoom_crop(t): |
if t < FIRST_PHASE: |
return (w, h) |
progress = (t - FIRST_PHASE) / TRANSITION |
zoom_factor = 1 + (progress * 2) |
else: |
zoom_factor = 3 |
return (int(w/zoom_factor), int(h/zoom_factor)) |
def apply_zoom(gf, t): |
frame = gf(t) |
cw, ch = get_zoom_crop(t) |
if cw >= w or ch >= h: |
return frame |
x = (w - cw) // 2 |
y = (h - ch) // 2 |
cropped = frame[y:y+ch, x:x+cw] |
return cv2.resize(cropped, (w, h), interpolation=cv2.INTER_LINEAR) |
video_comp = video_comp.fl(apply_zoom) |
video_comp = video_comp.set_duration(TOTAL_DURATION) |
text1 = create_text_overlay( |
"AirLetters", |
"\nAn Open Video Dataset of Characters Drawn in the Air", |
width, height, 0, FIRST_PHASE |
) |
text2 = create_text_overlay( |
"Novel Video Understanding Benchmark", |
"for evaluating the ability to understand articulated motions which requires very strong temporal capabilities, a task very challenging for current models", |
) |
final = CompositeVideoClip([video_comp, text1, text2]) |
print("Writing final video...") |
final.write_videofile( |
output_path, |
fps=fps, |
codec='libx264', |
audio=False, |
threads=16, |
logger='bar' |
) |
print("Cleaning up...") |
final.close() |
for clip in clips: |
if clip is not None: |
clip.close() |
print(f"\nTotal processing time: {time.time() - start_time:.2f} seconds") |
print(f"Output saved to: {output_path}") |
if __name__ == "__main__": |
create_montage( |
video_dir="airletters/videos", |
output_path="30fps.mp4", |
fps=30, |
) |