|
|
import threading |
|
|
import gradio as gr |
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
|
|
|
|
from modules.collector import fetch_all_channel_videos |
|
|
from modules.db import get_collection |
|
|
from modules.indexer import index_videos |
|
|
|
|
|
|
|
|
stop_event = threading.Event() |
|
|
MAX_BATCHES = 200 |
|
|
|
|
|
def stop_sync(): |
|
|
"""External call to stop the sync process.""" |
|
|
stop_event.set() |
|
|
|
|
|
def sync_channels_from_youtube(api_key, channel_urls: list, progress: gr.Progress = None): |
|
|
""" |
|
|
Sync multiple channels, yielding (progress_message, videos_indexed_in_batch) |
|
|
""" |
|
|
global stop_event |
|
|
stop_event.clear() |
|
|
|
|
|
total_channels = len(channel_urls) |
|
|
total_videos = 0 |
|
|
|
|
|
for idx, channel_url in enumerate(channel_urls, 1): |
|
|
if stop_event.is_set(): |
|
|
yield f"π Stopped before processing channel: {channel_url}", 0 |
|
|
break |
|
|
|
|
|
yield f"π Syncing {channel_url} ({idx}/{total_channels})", 0 |
|
|
|
|
|
|
|
|
for update_message, batch_count in _refresh_single_channel(api_key, channel_url, progress): |
|
|
total_videos += batch_count |
|
|
yield update_message, batch_count |
|
|
|
|
|
yield f"β
Finished syncing. Total channels: {total_channels}, total videos: {total_videos}", 0 |
|
|
|
|
|
|
|
|
def _refresh_single_channel(api_key, channel_url, progress): |
|
|
|
|
|
fetched_batches = list(fetch_all_channel_videos(api_key, channel_url)) |
|
|
all_videos = [v | {"channel_url": channel_url} for _, batch in fetched_batches for v in batch] |
|
|
total_videos = len(all_videos) |
|
|
|
|
|
if total_videos == 0: |
|
|
yield f"{channel_url}: No videos found", 0 |
|
|
return |
|
|
|
|
|
with ThreadPoolExecutor(max_workers=4) as executor: |
|
|
futures = [ |
|
|
executor.submit(index_videos, batch, get_collection(), channel_url=channel_url) |
|
|
for _, batch in fetched_batches |
|
|
] |
|
|
|
|
|
completed_videos = 0 |
|
|
for f in as_completed(futures): |
|
|
if stop_event.is_set(): |
|
|
yield "π Stop requested during indexing stage", completed_videos |
|
|
break |
|
|
|
|
|
try: |
|
|
indexed_count = f.result() |
|
|
if indexed_count is None: |
|
|
indexed_count = len(all_videos) |
|
|
except Exception as e: |
|
|
indexed_count = 0 |
|
|
yield f"β οΈ Error indexing {channel_url}: {e}", completed_videos |
|
|
|
|
|
completed_videos += indexed_count |
|
|
pct = 100.0 * completed_videos / max(1, total_videos) |
|
|
|
|
|
if progress: |
|
|
progress(completed_videos / total_videos) |
|
|
|
|
|
yield f"{channel_url}: Indexed {completed_videos}/{total_videos} videos β {pct:.1f}%", completed_videos |
|
|
|