from pathlib import Path from typing import List, Union import random import re from datetime import timedelta from tqdm import trange import numpy as np import librosa import cv2 from zhon.hanzi import punctuation as zh_punc from moviepy.editor import ImageClip, AudioFileClip, CompositeAudioClip, \ CompositeVideoClip, ColorClip, VideoFileClip, VideoClip, TextClip, concatenate_audioclips import moviepy.video.compositing.transitions as transfx from moviepy.audio.AudioClip import AudioArrayClip from moviepy.audio.fx.all import audio_loop from moviepy.video.tools.subtitles import SubtitlesClip def generate_srt(timestamps: List, captions: List, save_path: Union[str, Path], max_single_length: int = 30): def format_time(seconds: float) -> str: td = timedelta(seconds=seconds) total_seconds = int(td.total_seconds()) millis = int((td.total_seconds() - total_seconds) * 1000) hours, remainder = divmod(total_seconds, 3600) minutes, seconds = divmod(remainder, 60) return f"{hours:02}:{minutes:02}:{seconds:02},{millis:03}" srt_content = [] num_caps = len(timestamps) for idx in range(num_caps): start_time, end_time = timestamps[idx] caption_chunks = split_caption(captions[idx], max_single_length).split("\n") num_chunks = len(caption_chunks) if num_chunks == 0: continue segment_duration = (end_time - start_time) / num_chunks for chunk_idx, chunk in enumerate(caption_chunks): chunk_start_time = start_time + segment_duration * chunk_idx chunk_end_time = start_time + segment_duration * (chunk_idx + 1) start_time_str = format_time(chunk_start_time) end_time_str = format_time(chunk_end_time) srt_content.append(f"{len(srt_content) // 2 + 1}\n{start_time_str} --> {end_time_str}\n{chunk}\n\n") with open(save_path, 'w') as srt_file: srt_file.writelines(srt_content) def add_caption(captions: List, srt_path: Union[str, Path], timestamps: List, video_clip: VideoClip, max_single_length: int = 30, **caption_config): generate_srt(timestamps, captions, srt_path, max_single_length) generator = lambda txt: TextClip(txt, **caption_config) subtitles = SubtitlesClip(srt_path.__str__(), generator) captioned_clip = CompositeVideoClip([video_clip, subtitles.set_position(("center", "bottom"), relative=True)]) return captioned_clip def split_keep_separator(text, separator): pattern = f'([{re.escape(separator)}])' pieces = re.split(pattern, text) return pieces def split_caption(caption, max_length=30): lines = [] if ord(caption[0]) >= ord("a") and ord(caption[0]) <= ord("z") or ord(caption[0]) >= ord("A") and ord(caption[0]) <= ord("Z"): words = caption.split(" ") current_words = [] for word in words: if len(" ".join(current_words + [word])) <= max_length: current_words += [word] else: if current_words: lines.append(" ".join(current_words)) current_words = [] if current_words: lines.append(" ".join(current_words)) else: sentences = split_keep_separator(caption, zh_punc) current_line = "" for sentence in sentences: if len(current_line + sentence) <= max_length: current_line += sentence else: if current_line: lines.append(current_line) current_line = "" if sentence.startswith(tuple(zh_punc)): if lines: lines[-1] += sentence[0] current_line = sentence[1:] else: current_line = sentence if current_line: lines.append(current_line.strip()) return '\n'.join(lines) def add_bottom_black_area(clip: VideoFileClip, black_area_height: int = 64): """ Add a black area at the bottom of the video clip (for captions). Args: clip (VideoFileClip): Video clip to be processed. black_area_height (int): Height of the black area. Returns: VideoFileClip: Processed video clip. """ black_bar = ColorClip(size=(clip.w, black_area_height), color=(0, 0, 0), duration=clip.duration) extended_clip = CompositeVideoClip([clip, black_bar.set_position(("center", "bottom"))]) return extended_clip def add_zoom_effect(clip, speed=1.0, mode='in', position='center'): fps = clip.fps duration = clip.duration total_frames = int(duration * fps) def main(getframe, t): frame = getframe(t) h, w = frame.shape[: 2] i = t * fps if mode == 'out': i = total_frames - i zoom = 1 + (i * ((0.1 * speed) / total_frames)) positions = {'center': [(w - (w * zoom)) / 2, (h - (h * zoom)) / 2], 'left': [0, (h - (h * zoom)) / 2], 'right': [(w - (w * zoom)), (h - (h * zoom)) / 2], 'top': [(w - (w * zoom)) / 2, 0], 'topleft': [0, 0], 'topright': [(w - (w * zoom)), 0], 'bottom': [(w - (w * zoom)) / 2, (h - (h * zoom))], 'bottomleft': [0, (h - (h * zoom))], 'bottomright': [(w - (w * zoom)), (h - (h * zoom))]} tx, ty = positions[position] M = np.array([[zoom, 0, tx], [0, zoom, ty]]) frame = cv2.warpAffine(frame, M, (w, h)) return frame return clip.fl(main) def add_move_effect(clip, direction="left", move_raito=0.95): orig_width = clip.size[0] orig_height = clip.size[1] new_width = int(orig_width / move_raito) new_height = int(orig_height / move_raito) clip = clip.resize(width=new_width, height=new_height) if direction == "left": start_position = (0, 0) end_position = (orig_width - new_width, 0) elif direction == "right": start_position = (orig_width - new_width, 0) end_position = (0, 0) duration = clip.duration moving_clip = clip.set_position( lambda t: (start_position[0] + ( end_position[0] - start_position[0]) / duration * t, start_position[1]) ) final_clip = CompositeVideoClip([moving_clip], size=(orig_width, orig_height)) return final_clip def add_slide_effect(clips, slide_duration): ####### CAUTION: requires at least `slide_duration` of silence at the end of each clip ####### durations = [clip.duration for clip in clips] first_clip = CompositeVideoClip( [clips[0].fx(transfx.slide_out, duration=slide_duration, side="left")] ).set_start(0) slide_out_sides = ["left"] videos = [first_clip] out_to_in_mapping = {"left": "right", "right": "left"} for idx, clip in enumerate(clips[1: -1], start=1): # For all other clips in the middle, we need them to slide in to the previous clip and out for the next one # determine `slide_in_side` according to the `slide_out_side` of the previous clip slide_in_side = out_to_in_mapping[slide_out_sides[-1]] slide_out_side = "left" if random.random() <= 0.5 else "right" slide_out_sides.append(slide_out_side) videos.append( ( CompositeVideoClip( [clip.fx(transfx.slide_in, duration=slide_duration, side=slide_in_side)] ) .set_start(sum(durations[:idx]) - (slide_duration) * idx) .fx(transfx.slide_out, duration=slide_duration, side=slide_out_side) ) ) last_clip = CompositeVideoClip( [clips[-1].fx(transfx.slide_in, duration=slide_duration, side=out_to_in_mapping[slide_out_sides[-1]])] ).set_start(sum(durations[:-1]) - slide_duration * (len(clips) - 1)) videos.append(last_clip) video = CompositeVideoClip(videos) return video def compose_video(story_dir: Union[str, Path], save_path: Union[str, Path], captions: List, music_path: Union[str, Path], num_pages: int, fps: int = 10, audio_sample_rate: int = 16000, audio_codec: str = "mp3", caption_config: dict = {}, max_single_caption_length: int = 30, fade_duration: float = 1.0, slide_duration: float = 0.4, zoom_speed: float = 0.5, move_ratio: float = 0.95, sound_volume: float = 0.2, music_volume: float = 0.2, bg_speech_ratio: float = 0.4): if not isinstance(story_dir, Path): story_dir = Path(story_dir) sound_dir = story_dir / "sound" image_dir = story_dir / "image" speech_dir = story_dir / "speech" video_clips = [] # audio_durations = [] cur_duration = 0 timestamps = [] for page in trange(1, num_pages + 1): ##### speech track slide_silence = AudioArrayClip(np.zeros((int(audio_sample_rate * slide_duration), 2)), fps=audio_sample_rate) fade_silence = AudioArrayClip(np.zeros((int(audio_sample_rate * fade_duration), 2)), fps=audio_sample_rate) if (speech_dir / f"p{page}.wav").exists(): # single speech file single_utterance = True speech_file = (speech_dir / f"./p{page}.wav").__str__() speech_clip = AudioFileClip(speech_file, fps=audio_sample_rate) # speech_clip = speech_clip.audio_fadein(fade_duration) speech_clip = concatenate_audioclips([fade_silence, speech_clip, fade_silence]) else: # multiple speech files single_utterance = False speech_files = list(speech_dir.glob(f"p{page}_*.wav")) speech_files = sorted(speech_files, key=lambda x: int(x.stem.split("_")[-1])) speech_clips = [] for utt_idx, speech_file in enumerate(speech_files): speech_clip = AudioFileClip(speech_file.__str__(), fps=audio_sample_rate) # add multiple timestamps of the same speech clip if utt_idx == 0: timestamps.append([cur_duration + fade_duration, cur_duration + fade_duration + speech_clip.duration]) cur_duration += speech_clip.duration + fade_duration elif utt_idx == len(speech_files) - 1: timestamps.append([ cur_duration, cur_duration + speech_clip.duration ]) cur_duration += speech_clip.duration + fade_duration + slide_duration else: timestamps.append([ cur_duration, cur_duration + speech_clip.duration ]) cur_duration += speech_clip.duration speech_clips.append(speech_clip) speech_clip = concatenate_audioclips([fade_silence] + speech_clips + [fade_silence]) speech_file = speech_files[0] # for energy calculation # add slide silence if page == 1: speech_clip = concatenate_audioclips([speech_clip, slide_silence]) else: speech_clip = concatenate_audioclips([slide_silence, speech_clip, slide_silence]) # add the timestamp of the whole clip as a single element if single_utterance: if page == 1: timestamps.append([cur_duration + fade_duration, cur_duration + speech_clip.duration - fade_duration - slide_duration]) cur_duration += speech_clip.duration - slide_duration else: timestamps.append([cur_duration + fade_duration + slide_duration, cur_duration + speech_clip.duration - fade_duration - slide_duration]) cur_duration += speech_clip.duration - slide_duration speech_array, _ = librosa.core.load(speech_file, sr=None) speech_rms = librosa.feature.rms(y=speech_array)[0].mean() # set image as the main content, align the duration image_file = (image_dir / f"./p{page}.png").__str__() image_clip = ImageClip(image_file) image_clip = image_clip.set_duration(speech_clip.duration).set_fps(fps) image_clip = image_clip.crossfadein(fade_duration).crossfadeout(fade_duration) if random.random() <= 0.5: # zoom in or zoom out if random.random() <= 0.5: zoom_mode = "in" else: zoom_mode = "out" image_clip = add_zoom_effect(image_clip, zoom_speed, zoom_mode) else: # move left or right if random.random() <= 0.5: direction = "left" else: direction = "right" image_clip = add_move_effect(image_clip, direction=direction, move_raito=move_ratio) # sound track sound_file = sound_dir / f"p{page}.wav" if sound_file.exists(): sound_clip = AudioFileClip(sound_file.__str__(), fps=audio_sample_rate) sound_clip = sound_clip.audio_fadein(fade_duration) if sound_clip.duration < speech_clip.duration: sound_clip = audio_loop(sound_clip, duration=speech_clip.duration) else: sound_clip = sound_clip.subclip(0, speech_clip.duration) sound_array, _ = librosa.core.load(sound_file.__str__(), sr=None) sound_rms = librosa.feature.rms(y=sound_array)[0].mean() ratio = speech_rms / sound_rms * bg_speech_ratio audio_clip = CompositeAudioClip([speech_clip, sound_clip.volumex(sound_volume * ratio).audio_fadeout(fade_duration)]) else: audio_clip = speech_clip video_clip = image_clip.set_audio(audio_clip) video_clips.append(video_clip) # audio_durations.append(audio_clip.duration) # final_clip = concatenate_videoclips(video_clips, method="compose") composite_clip = add_slide_effect(video_clips, slide_duration=slide_duration) composite_clip = add_bottom_black_area(composite_clip, black_area_height=caption_config["area_height"]) del caption_config["area_height"] composite_clip = add_caption( captions, story_dir / "captions.srt", timestamps, composite_clip, max_single_caption_length, **caption_config ) # add music track, align the duration music_clip = AudioFileClip(music_path.__str__(), fps=audio_sample_rate) music_array, _ = librosa.core.load(music_path.__str__(), sr=None) music_rms = librosa.feature.rms(y=music_array)[0].mean() ratio = speech_rms / music_rms * bg_speech_ratio if music_clip.duration < composite_clip.duration: music_clip = audio_loop(music_clip, duration=composite_clip.duration) else: music_clip = music_clip.subclip(0, composite_clip.duration) all_audio_clip = CompositeAudioClip([composite_clip.audio, music_clip.volumex(music_volume * ratio)]) composite_clip = composite_clip.set_audio(all_audio_clip) composite_clip.write_videofile(save_path.__str__(), audio_fps=audio_sample_rate, audio_codec=audio_codec,) class VideoComposeAgent: def adjust_caption_config(self, width, height): area_height = int(height * 0.06) fontsize = int((width + height) / 2 * 0.025) return { "fontsize": fontsize, "area_height": area_height } def call(self, pages, config): height = config["image_generation"]["obj_cfg"]["height"] width = config["image_generation"]["obj_cfg"]["width"] config["caption_config"].update(self.adjust_caption_config(width, height)) compose_video( story_dir=Path(config["story_dir"]), save_path=Path(config["story_dir"]) / "output.mp4", captions=pages, music_path=Path(config["story_dir"]) / "music/music.wav", num_pages=len(pages), audio_sample_rate=config["audio_sample_rate"], audio_codec=config["audio_codec"], caption_config=config["caption_config"], max_single_caption_length=config["max_single_caption_length"], **config["slideshow_effect"] )