Spaces:
Sleeping
Sleeping
from typing import Dict, List, Optional, Tuple, Union | |
from PIL import Image | |
from pydantic import BaseModel | |
from pydub import AudioSegment | |
from pydub.effects import normalize | |
from scenedetect import (ContentDetector, FrameTimecode, SceneManager, | |
VideoStream, open_video) | |
class Scene(BaseModel): | |
start: FrameTimecode | |
end: FrameTimecode | |
stt_res: Optional[Dict] = None | |
summary: Optional[Dict] = None | |
class Config: | |
"""Configuration for this pydantic object.""" | |
arbitrary_types_allowed = True | |
def init(cls, start: FrameTimecode, end: FrameTimecode, summary: dict = None): | |
return cls(start=start, end=end, summary=summary) | |
def conversation(self): | |
# for self deployed whisper | |
if isinstance(self.stt_res, list): | |
output_conversation = "\n".join( | |
[f"{item.get('text', None)}" for item in self.stt_res] | |
) | |
else: | |
output_conversation = self.stt_res | |
return output_conversation | |
class VideoScenes(BaseModel): | |
stream: VideoStream | |
audio: Union[AudioSegment, None] | |
scenes: List[Scene] | |
frame_extraction_interval: int | |
class Config: | |
"""Configuration for this pydantic object.""" | |
extra = "allow" | |
arbitrary_types_allowed = True | |
def load( | |
cls, | |
video_path: str, | |
threshold: int = 27, | |
min_scene_len: int = 1, | |
frame_extraction_interval: int = 5, | |
show_progress: bool = False, | |
kernel_size: Optional[int] = None, | |
): | |
"""Load a video file. | |
Args: | |
video_path (str): The path of the video file. Only support local file. | |
threshold (int): The scene detection threshold. | |
min_scene_len (int): Once a cut is detected, this long time must pass before a new one can | |
be added to the scene list. Count in seconds, defaults to 1. | |
show_progress (bool, optional): Whether to display the progress bar when processing the video. Defaults to False. | |
""" | |
video = open_video(video_path) | |
scene_manager = SceneManager() | |
weight = ContentDetector.Components( | |
delta_hue=1.0, | |
delta_sat=1.0, | |
delta_lum=0.0, | |
delta_edges=1.0, | |
) | |
if kernel_size is None: | |
scene_manager.add_detector( | |
ContentDetector( | |
threshold=threshold, | |
min_scene_len=int(video.frame_rate * min_scene_len), | |
weights=weight, | |
) | |
) | |
else: | |
scene_manager.add_detector( | |
ContentDetector( | |
threshold=threshold, | |
min_scene_len=int(video.frame_rate * min_scene_len), | |
weights=weight, | |
kernel_size=kernel_size, | |
) | |
) | |
scene_manager.detect_scenes(video, show_progress=show_progress) | |
scenes = scene_manager.get_scene_list(start_in_scene=True) | |
try: | |
audio = AudioSegment.from_file(video_path) | |
audio = normalize(audio) | |
except (IndexError, OSError): | |
audio = None | |
return cls( | |
stream=video, | |
scenes=[Scene.init(*scene) for scene in scenes], | |
audio=audio, | |
frame_extraction_interval=frame_extraction_interval, | |
) | |
def get_video_frames( | |
self, scene: Union[int, Scene, Tuple[FrameTimecode]], interval: int = None | |
) -> Tuple[List[Image.Image], List[float]]: | |
"""Get the frames of a scene. | |
Args: | |
scene (Union[int, Scene, Tuple[FrameTimecode]]): The scene to get frames. Can be the index of the scene, the scene object or a tuple of start and end frame timecode. | |
interval (int, optional): The interval of the frames to get. Defaults to None. | |
Raises: | |
ValueError: If the type of scene is not int, Scene or tuple. | |
Returns: | |
List[ndarray]: The frames of the scene. | |
""" | |
if isinstance(scene, int): | |
scene = self.scenes[scene] | |
start, end = scene.start, scene.end | |
elif isinstance(scene, Scene): | |
start, end = scene.start, scene.end | |
elif isinstance(scene, tuple): | |
start, end = scene | |
else: | |
raise ValueError( | |
f"scene should be int, Scene or tuple, not {type(scene).__name__}" | |
) | |
self.stream.seek(start) | |
frames = [] | |
time_stamps = [] | |
if interval is None: | |
interval = self.frame_extraction_interval * self.stream.frame_rate | |
scene_len = end.get_frames() - start.get_frames() | |
if scene_len / 10 > interval: | |
interval = int(scene_len / 10) + 1 | |
for index in range(scene_len): | |
if index % interval == 0: | |
f = self.stream.read() | |
frames.append(Image.fromarray(f)) | |
time_stamps.append(self.stream.position.get_seconds()) | |
else: | |
self.stream.read(decode=False) | |
self.stream.seek(0) | |
return frames, time_stamps | |
def get_audio_clip( | |
self, scene: Union[int, Scene, Tuple[FrameTimecode]] | |
) -> AudioSegment: | |
"""Get the audio clip of a scene. | |
Args: | |
scene (Union[int, Scene, Tuple[FrameTimecode]]): The scene to get audio clip. Can be the index of the scene, the scene object or a tuple of start and end frame timecode. | |
Raises: | |
ValueError: If the type of scene is not int, Scene or tuple. | |
Returns: | |
AudioSegment: The audio clip of the scene. | |
""" | |
if self.audio is None: | |
return None | |
if isinstance(scene, int): | |
scene = self.scenes[scene] | |
start, end = scene.start, scene.end | |
elif isinstance(scene, Scene): | |
start, end = scene.start, scene.end | |
elif isinstance(scene, tuple): | |
start, end = scene | |
else: | |
raise ValueError( | |
f"scene should be int, Scene or tuple, not {type(scene).__name__}" | |
) | |
return self.audio[ | |
int(start.get_seconds() * 1000) : int(end.get_seconds() * 1000) | |
] | |
def __len__(self): | |
return len(self.scenes) | |
def __iter__(self): | |
self.index = 0 | |
return self | |
def __next__(self): | |
if self.index >= len(self.scenes): | |
raise StopIteration | |
scene = self.scenes[self.index] | |
self.index += 1 | |
return scene | |
def __getitem__(self, index): | |
return self.scenes[index] | |
def __setitem__(self, index, value): | |
self.scenes[index] = value | |
def to_serializable(self) -> dict: | |
"""Convert VideoScenes to a serializable dictionary.""" | |
scenes_data = [] | |
for scene in self.scenes: | |
scenes_data.append( | |
{ | |
"start_frame": scene.start.frame_num, | |
"end_frame": scene.end.frame_num, | |
"stt_res": scene.stt_res, | |
"summary": scene.summary, | |
} | |
) | |
return { | |
"video_path": self.stream.path, | |
"frame_rate": self.stream.frame_rate, | |
"scenes": scenes_data, | |
"frame_extraction_interval": self.frame_extraction_interval, | |
} | |
def from_serializable(cls, data: dict): | |
"""Rebuild VideoScenes from serialized data.""" | |
video = open_video(data["video_path"]) | |
try: | |
audio = AudioSegment.from_file(data["video_path"]) | |
audio = normalize(audio) | |
except Exception: | |
audio = None | |
# Rebuild scenes list | |
scenes = [] | |
for scene_data in data["scenes"]: | |
start = FrameTimecode(scene_data["start_frame"], data["frame_rate"]) | |
end = FrameTimecode(scene_data["end_frame"], data["frame_rate"]) | |
scene = Scene.init(start, end) | |
scene.stt_res = scene_data["stt_res"] | |
scene.summary = scene_data["summary"] | |
scenes.append(scene) | |
return cls( | |
stream=video, | |
scenes=scenes, | |
audio=audio, | |
frame_extraction_interval=data["frame_extraction_interval"], | |
) | |