Spaces:
Runtime error
Runtime error
File size: 3,373 Bytes
b7c7aa0 dc465b0 b7c7aa0 dc465b0 b7c7aa0 dc465b0 b7c7aa0 dc465b0 b7c7aa0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# video_processor.py
from io import BytesIO
import av
import base64
from PIL import Image
from typing import List
from dataclasses import dataclass
def sample(N, K):
array = list(range(N))
length = len(array)
if K >= length or K<2:
return array
k = length // K
sampled_points = [array[i] for i in range(0, length, k)][:K-1]
sampled_points.append(array[-1])
return sampled_points
def grid_sample(array, N, K):
group_size, remainder = len(array) // K, len(array) % K
sampled_groups = []
for i in range(K):
s = i * group_size + min(i, remainder)
e = s + group_size + (1 if i < remainder else 0)
group = array[s:e]
if N >= len(group):
sampled_groups.append(group)
else:
interval = len(group) // N
sampled_groups.append([group[j * interval] for j in range(N)])
return sampled_groups
@dataclass
class VideoProcessor:
frame_format: str = "JPEG"
frame_limit: int = 10
def _decode(self, video_path: str) -> List[Image.Image]:
frames = []
with av.open(video_path) as container:
src = container.streams.video[0]
time_base = src.time_base
framerate = src.average_rate
for i in sample(src.frames, self.frame_limit):
n = round((i / framerate) / time_base)
container.seek(n, backward=True, stream=src)
frame = next(container.decode(video=0))
im = frame.to_image()
frames.append(im)
return frames
def decode(self, video_path: str) -> List[Image.Image]:
frames = []
container = av.open(video_path)
for i, frame in enumerate(container.decode(video=0)):
if i % self.frame_skip:
continue
im = frame.to_image()
frames.append(im)
return frames
def concatenate(self, frames: List[Image.Image], direction: str = "horizontal") -> Image.Image:
widths, heights = zip(*(frame.size for frame in frames))
if direction == "horizontal":
total_width = sum(widths)
max_height = max(heights)
concatenated_image = Image.new('RGB', (total_width, max_height))
x_offset = 0
for frame in frames:
concatenated_image.paste(frame, (x_offset, 0))
x_offset += frame.width
else:
max_width = max(widths)
total_height = sum(heights)
concatenated_image = Image.new('RGB', (max_width, total_height))
y_offset = 0
for frame in frames:
concatenated_image.paste(frame, (0, y_offset))
y_offset += frame.height
return concatenated_image
def grid_concatenate(self, frames: List[Image.Image], group_size, limit=10) -> List[Image.Image]:
sampled_groups = grid_sample(frames, group_size, limit)
return [self.concatenate(group) for group in sampled_groups]
def to_base64_list(self, images: List[Image.Image]) -> List[str]:
base64_list = []
for image in images:
buffered = BytesIO()
image.save(buffered, format=self.frame_format)
base64_list.append(base64.b64encode(buffered.getvalue()).decode('utf-8'))
return base64_list
|