import hashlib import pickle import time from pathlib import Path from typing import List, Optional, Union import json_repair from omagent_core.engine.worker.base import BaseWorker from omagent_core.models.asr.stt import STT # from omagent_core.models.encoders.openai_encoder import OpenaiTextEmbeddingV3 from omagent_core.models.encoders.azure_encoder import AzureTextEmbeddingV3 from omagent_core.models.llms.base import BaseLLMBackend from omagent_core.models.llms.prompt import PromptTemplate from omagent_core.utils.registry import registry from pydantic import Field, field_validator from pydub import AudioSegment from pydub.effects import normalize from scenedetect import open_video from ..misc.scene import VideoScenes CURRENT_PATH = root_path = Path(__file__).parents[0] @registry.register_worker() class WebpageVideoPreprocessor(BaseLLMBackend, BaseWorker): prompts: List[PromptTemplate] = Field( default=[ PromptTemplate.from_file( CURRENT_PATH.joinpath("sys_prompt.prompt"), role="system" ), PromptTemplate.from_file( CURRENT_PATH.joinpath("user_prompt.prompt"), role="user" ), ] ) text_encoder: AzureTextEmbeddingV3 stt: STT scene_detect_threshold: Union[float, int] = 27 min_scene_len: int = 1 frame_extraction_interval: int = 5 kernel_size: Optional[int] = None show_progress: bool = True use_cache: bool = True cache_dir: str = "./video_cache" @field_validator("stt", mode="before") @classmethod def validate_asr(cls, stt): if isinstance(stt, STT): return stt elif isinstance(stt, dict): return STT(**stt) else: raise ValueError("Invalid STT type.") def calculate_md5(self, file_path): md5_hash = hashlib.md5() with open(file_path, "rb") as file: for byte_block in iter(lambda: file.read(4096), b""): md5_hash.update(byte_block) return md5_hash.hexdigest() def _run(self, video_path: str, *args, **kwargs): """ Process video files by: 1. Calculating MD5 hash of input video for caching 2. Loading video from cache if available and use_cache=True 3. Otherwise, processing video by: - Extracting audio and video streams - Detecting scene boundaries - Extracting frames at specified intervals - Generating scene summaries using LLM - Caching results for future use Args: video_path (str): Path to input video file *args: Variable length argument list **kwargs: Arbitrary keyword arguments Returns: dict: Dictionary containing video_md5 and video_path """ video_md5 = self.calculate_md5(video_path) kwargs["video_md5"] = video_md5 cache_path = ( Path(self.cache_dir) .joinpath(Path(video_path).name.replace("/", "-")) .joinpath("video_cache.pkl") ) # Load video from cache if available if self.use_cache and cache_path.exists(): with open(cache_path, "rb") as f: loaded_scene = pickle.load(f) try: audio = AudioSegment.from_file(video_path) audio = normalize(audio) except Exception: audio = None video = VideoScenes( stream=open_video(video_path), audio=audio, scenes=loaded_scene, frame_extraction_interval=self.frame_extraction_interval, ) self.callback.send_block( agent_id=self.workflow_instance_id, msg="Loaded video scenes from cache.\nResume the interrupted transfer for results with scene.summary of None.", ) for index, scene in enumerate(video.scenes): if scene.summary is None: self.callback.send_block( agent_id=self.workflow_instance_id, msg=f"Resume the interrupted transfer for scene {index}.", ) video_frames, time_stamps = video.get_video_frames(scene) try: chat_complete_res = self.infer( input_list=[ { "stt_res": scene.conversation, "img_placeholders": "".join( [ f"" for i in range(len(video_frames)) ] ), } ], images=video_frames, ) scene.summary = chat_complete_res[0]["choices"][0][ "message" ]["content"] scene_info = scene.summary.get("scene", []) events = scene.summary.get("events", []) start_time = scene.start.get_seconds() end_time = scene.end.get_seconds() content = ( f"Time in video: {scene.summary.get('time', 'null')}\n" f"Location: {scene.summary.get('location', 'null')}\n" f"Character': {scene.summary.get('character', 'null')}\n" f"Events: {events}\n" f"Scene: {scene_info}\n" f"Summary: {scene.summary.get('summary', '')}" ) content_vector = self.text_encoder.infer([content])[0] self.ltm[index] = { "value": { "video_md5": video_md5, "content": content, "start_time": start_time, "end_time": end_time, }, "embedding": content_vector, } except Exception as e: self.callback.error( f"Failed to resume scene {index}: {e}. Set to default." ) scene.summary = { "time": "", "location": "", "character": "", "events": [], "scene": [], "summary": "", } self.stm(self.workflow_instance_id)["video"] = video.to_serializable() # Cache the processed video scenes with open(cache_path, "wb") as f: pickle.dump(video.scenes, f) # Process video if not loaded from cache # if not self.stm(self.workflow_instance_id).get("video", None): if not cache_path.exists(): video = VideoScenes.load( video_path=video_path, threshold=self.scene_detect_threshold, min_scene_len=self.min_scene_len, frame_extraction_interval=self.frame_extraction_interval, show_progress=self.show_progress, kernel_size=self.kernel_size, ) self.stm(self.workflow_instance_id)["video"] = video.to_serializable() for index, scene in enumerate(video.scenes): print(f"Processing scene {index} / {len(video.scenes)}...") audio_clip = video.get_audio_clip(scene) if audio_clip is None: scene.stt_res = {"text": ""} else: scene.stt_res = self.stt.infer(audio_clip) video_frames, time_stamps = video.get_video_frames(scene) try: face_rec = registry.get_tool("FaceRecognition") for frame in video_frames: objs = face_rec.infer(frame) face_rec.visual_prompting(frame, objs) except Exception: pass try: chat_complete_res = self.infer( input_list=[ { "stt_res": scene.conversation, "img_placeholders": "".join( [f"" for i in range(len(video_frames))] ), } ], images=video_frames, ) scene.summary = chat_complete_res[0]["choices"][0]["message"][ "content" ] scene_info = scene.summary.get("scene", []) events = scene.summary.get("events", []) start_time = scene.start.get_seconds() end_time = scene.end.get_seconds() content = ( f"Time in video: {scene.summary.get('time', 'null')}\n" f"Location: {scene.summary.get('location', 'null')}\n" f"Character': {scene.summary.get('character', 'null')}\n" f"Events: {events}\n" f"Scene: {scene_info}\n" f"Summary: {scene.summary.get('summary', '')}" ) content_vector = self.text_encoder.infer([content])[0] self.ltm[index] = { "value": { "video_md5": video_md5, "content": content, "start_time": start_time, "end_time": end_time, }, "embedding": content_vector, } except Exception as e: self.callback.error(f"Failed to process scene {index}: {e}") scene.summary = None if self.use_cache and not cache_path.exists(): cache_path.parent.mkdir(parents=True, exist_ok=True) with open(cache_path, "wb") as f: pickle.dump(video.scenes, f) return { "video_md5": video_md5, "video_path": video_path, "instance_id": self.workflow_instance_id, }