Spaces:
Running
Running
import hashlib | |
import pickle | |
import time | |
from pathlib import Path | |
from typing import List, Optional, Union | |
import json_repair | |
from omagent_core.engine.worker.base import BaseWorker | |
from omagent_core.models.asr.stt import STT | |
# from omagent_core.models.encoders.openai_encoder import OpenaiTextEmbeddingV3 | |
from omagent_core.models.encoders.azure_encoder import AzureTextEmbeddingV3 | |
from omagent_core.models.llms.base import BaseLLMBackend | |
from omagent_core.models.llms.prompt import PromptTemplate | |
from omagent_core.utils.registry import registry | |
from pydantic import Field, field_validator | |
from pydub import AudioSegment | |
from pydub.effects import normalize | |
from scenedetect import open_video | |
from ..misc.scene import VideoScenes | |
CURRENT_PATH = root_path = Path(__file__).parents[0] | |
class WebpageVideoPreprocessor(BaseLLMBackend, BaseWorker): | |
prompts: List[PromptTemplate] = Field( | |
default=[ | |
PromptTemplate.from_file( | |
CURRENT_PATH.joinpath("sys_prompt.prompt"), role="system" | |
), | |
PromptTemplate.from_file( | |
CURRENT_PATH.joinpath("user_prompt.prompt"), role="user" | |
), | |
] | |
) | |
text_encoder: AzureTextEmbeddingV3 | |
stt: STT | |
scene_detect_threshold: Union[float, int] = 27 | |
min_scene_len: int = 1 | |
frame_extraction_interval: int = 5 | |
kernel_size: Optional[int] = None | |
show_progress: bool = True | |
use_cache: bool = True | |
cache_dir: str = "./video_cache" | |
def validate_asr(cls, stt): | |
if isinstance(stt, STT): | |
return stt | |
elif isinstance(stt, dict): | |
return STT(**stt) | |
else: | |
raise ValueError("Invalid STT type.") | |
def calculate_md5(self, file_path): | |
md5_hash = hashlib.md5() | |
with open(file_path, "rb") as file: | |
for byte_block in iter(lambda: file.read(4096), b""): | |
md5_hash.update(byte_block) | |
return md5_hash.hexdigest() | |
def _run(self, video_path: str, *args, **kwargs): | |
""" | |
Process video files by: | |
1. Calculating MD5 hash of input video for caching | |
2. Loading video from cache if available and use_cache=True | |
3. Otherwise, processing video by: | |
- Extracting audio and video streams | |
- Detecting scene boundaries | |
- Extracting frames at specified intervals | |
- Generating scene summaries using LLM | |
- Caching results for future use | |
Args: | |
video_path (str): Path to input video file | |
*args: Variable length argument list | |
**kwargs: Arbitrary keyword arguments | |
Returns: | |
dict: Dictionary containing video_md5 and video_path | |
""" | |
video_md5 = self.calculate_md5(video_path) | |
kwargs["video_md5"] = video_md5 | |
cache_path = ( | |
Path(self.cache_dir) | |
.joinpath(Path(video_path).name.replace("/", "-")) | |
.joinpath("video_cache.pkl") | |
) | |
# Load video from cache if available | |
if self.use_cache and cache_path.exists(): | |
with open(cache_path, "rb") as f: | |
loaded_scene = pickle.load(f) | |
try: | |
audio = AudioSegment.from_file(video_path) | |
audio = normalize(audio) | |
except Exception: | |
audio = None | |
video = VideoScenes( | |
stream=open_video(video_path), | |
audio=audio, | |
scenes=loaded_scene, | |
frame_extraction_interval=self.frame_extraction_interval, | |
) | |
self.callback.send_block( | |
agent_id=self.workflow_instance_id, | |
msg="Loaded video scenes from cache.\nResume the interrupted transfer for results with scene.summary of None.", | |
) | |
for index, scene in enumerate(video.scenes): | |
if scene.summary is None: | |
self.callback.send_block( | |
agent_id=self.workflow_instance_id, | |
msg=f"Resume the interrupted transfer for scene {index}.", | |
) | |
video_frames, time_stamps = video.get_video_frames(scene) | |
try: | |
chat_complete_res = self.infer( | |
input_list=[ | |
{ | |
"stt_res": scene.conversation, | |
"img_placeholders": "".join( | |
[ | |
f"<image_{i}>" | |
for i in range(len(video_frames)) | |
] | |
), | |
} | |
], | |
images=video_frames, | |
) | |
scene.summary = chat_complete_res[0]["choices"][0][ | |
"message" | |
]["content"] | |
scene_info = scene.summary.get("scene", []) | |
events = scene.summary.get("events", []) | |
start_time = scene.start.get_seconds() | |
end_time = scene.end.get_seconds() | |
content = ( | |
f"Time in video: {scene.summary.get('time', 'null')}\n" | |
f"Location: {scene.summary.get('location', 'null')}\n" | |
f"Character': {scene.summary.get('character', 'null')}\n" | |
f"Events: {events}\n" | |
f"Scene: {scene_info}\n" | |
f"Summary: {scene.summary.get('summary', '')}" | |
) | |
content_vector = self.text_encoder.infer([content])[0] | |
self.ltm[index] = { | |
"value": { | |
"video_md5": video_md5, | |
"content": content, | |
"start_time": start_time, | |
"end_time": end_time, | |
}, | |
"embedding": content_vector, | |
} | |
except Exception as e: | |
self.callback.error( | |
f"Failed to resume scene {index}: {e}. Set to default." | |
) | |
scene.summary = { | |
"time": "", | |
"location": "", | |
"character": "", | |
"events": [], | |
"scene": [], | |
"summary": "", | |
} | |
self.stm(self.workflow_instance_id)["video"] = video.to_serializable() | |
# Cache the processed video scenes | |
with open(cache_path, "wb") as f: | |
pickle.dump(video.scenes, f) | |
# Process video if not loaded from cache | |
# if not self.stm(self.workflow_instance_id).get("video", None): | |
if not cache_path.exists(): | |
video = VideoScenes.load( | |
video_path=video_path, | |
threshold=self.scene_detect_threshold, | |
min_scene_len=self.min_scene_len, | |
frame_extraction_interval=self.frame_extraction_interval, | |
show_progress=self.show_progress, | |
kernel_size=self.kernel_size, | |
) | |
self.stm(self.workflow_instance_id)["video"] = video.to_serializable() | |
for index, scene in enumerate(video.scenes): | |
print(f"Processing scene {index} / {len(video.scenes)}...") | |
audio_clip = video.get_audio_clip(scene) | |
if audio_clip is None: | |
scene.stt_res = {"text": ""} | |
else: | |
scene.stt_res = self.stt.infer(audio_clip) | |
video_frames, time_stamps = video.get_video_frames(scene) | |
try: | |
face_rec = registry.get_tool("FaceRecognition") | |
for frame in video_frames: | |
objs = face_rec.infer(frame) | |
face_rec.visual_prompting(frame, objs) | |
except Exception: | |
pass | |
try: | |
chat_complete_res = self.infer( | |
input_list=[ | |
{ | |
"stt_res": scene.conversation, | |
"img_placeholders": "".join( | |
[f"<image_{i}>" for i in range(len(video_frames))] | |
), | |
} | |
], | |
images=video_frames, | |
) | |
scene.summary = chat_complete_res[0]["choices"][0]["message"][ | |
"content" | |
] | |
scene_info = scene.summary.get("scene", []) | |
events = scene.summary.get("events", []) | |
start_time = scene.start.get_seconds() | |
end_time = scene.end.get_seconds() | |
content = ( | |
f"Time in video: {scene.summary.get('time', 'null')}\n" | |
f"Location: {scene.summary.get('location', 'null')}\n" | |
f"Character': {scene.summary.get('character', 'null')}\n" | |
f"Events: {events}\n" | |
f"Scene: {scene_info}\n" | |
f"Summary: {scene.summary.get('summary', '')}" | |
) | |
content_vector = self.text_encoder.infer([content])[0] | |
self.ltm[index] = { | |
"value": { | |
"video_md5": video_md5, | |
"content": content, | |
"start_time": start_time, | |
"end_time": end_time, | |
}, | |
"embedding": content_vector, | |
} | |
except Exception as e: | |
self.callback.error(f"Failed to process scene {index}: {e}") | |
scene.summary = None | |
if self.use_cache and not cache_path.exists(): | |
cache_path.parent.mkdir(parents=True, exist_ok=True) | |
with open(cache_path, "wb") as f: | |
pickle.dump(video.scenes, f) | |
return { | |
"video_md5": video_md5, | |
"video_path": video_path, | |
"instance_id": self.workflow_instance_id, | |
} | |