vikramvasudevan commited on
Commit
e51e296
Β·
verified Β·
1 Parent(s): b9afd09

Upload folder using huggingface_hub

Browse files
app.py CHANGED
@@ -13,6 +13,8 @@ from modules.indexer import index_videos
13
  from modules.answerer import answer_query, LLMAnswer, VideoItem, build_video_html
14
  from dotenv import load_dotenv
15
 
 
 
16
  load_dotenv()
17
 
18
 
@@ -58,52 +60,45 @@ def enable_if_not_none(question):
58
  return enable_component()
59
 
60
 
61
- # -------------------------------
62
- # Fetch & index channels
63
- # -------------------------------
64
- def refresh_channel(api_key, channel_url: str):
65
- all_videos = []
66
- for status, videos_batch in fetch_all_channel_videos(api_key, channel_url):
67
- # enrich each video in the batch with channel_url
68
- for v in videos_batch:
69
- v["channel_url"] = channel_url
70
- all_videos.extend(videos_batch)
71
-
72
- # (optional) log progress
73
- print(channel_url, status, "- total so far:", len(all_videos))
74
-
75
- # index all at once
76
- if all_videos:
77
- print(f"{channel_url} - Adding {len(all_videos)} videos to database")
78
- index_videos(all_videos, get_collection(), channel_url=channel_url)
79
- print(f"{channel_url} - Loaded {len(all_videos)} videos to database")
80
-
81
- return len(all_videos)
82
-
83
-
84
  def index_channels(channel_urls: str):
85
  yield "saving ...", gr.update(), gr.update()
86
  yt_api_key = os.environ["YOUTUBE_API_KEY"]
 
87
  urls = [u.strip() for u in re.split(r"[\n,]+", channel_urls) if u.strip()]
88
- total_videos = sum(refresh_channel(yt_api_key, url) for url in urls)
89
- yield f"βœ… Indexed {total_videos} videos from {len(urls)} channels.", gr.update(
90
- choices=list_channels_radio()
91
- ), list_channels_radio()
92
- return
 
 
 
 
 
 
 
 
93
 
94
 
95
  def refresh_all_channels():
96
  yt_api_key = os.environ["YOUTUBE_API_KEY"]
97
  channels = get_indexed_channels(get_collection())
 
98
  if not channels:
99
  return "⚠️ No channels available to refresh.", list_channels_radio()
100
- total_videos = 0
 
 
101
  for key, val in channels.items():
102
  url = val.get("channel_url") if isinstance(val, dict) else key
103
  if url:
104
- total_videos += refresh_channel(yt_api_key, url)
 
 
 
 
105
  return (
106
- f"πŸ”„ Refreshed {len(channels)} channels, re-indexed {total_videos} videos.",
107
  list_channels_radio(),
108
  )
109
 
@@ -134,8 +129,7 @@ def fetch_channel_html(channel_id: str):
134
  # query your collection/db instead of YouTube API
135
  collection = get_collection()
136
  results = collection.get(
137
- where={"channel_id": channel_id},
138
- include=["documents", "metadatas"]
139
  )
140
 
141
  if not results or not results.get("metadatas"):
@@ -177,7 +171,6 @@ def fetch_channel_html(channel_id: str):
177
  return html
178
 
179
 
180
-
181
  # Delete a channel
182
  # -------------------------------
183
  def delete_channel(channel_url: str):
@@ -252,16 +245,17 @@ with gr.Blocks() as demo:
252
  # Sidebar
253
  with gr.Sidebar() as my_sidebar:
254
  gr.Markdown("### πŸ“Ί Channels")
255
- channel_list_state = gr.State([c for c in list_channels_radio()])
 
256
 
257
  no_channels_message = gr.Markdown(
258
  "⚠️ **No channels available.**",
259
- visible=False if channel_list_state.value else True,
260
  )
261
  channel_radio = gr.Radio(
262
- choices=channel_list_state.value,
263
  label="Select a Channel",
264
- visible=True if channel_list_state.value else False,
265
  )
266
 
267
  with gr.Row():
@@ -272,8 +266,19 @@ with gr.Blocks() as demo:
272
  variant="secondary",
273
  interactive=False,
274
  )
 
 
 
 
 
 
 
275
  refresh_all_btn = gr.Button(
276
- "πŸ”„Refresh", size="sm", scale=0, variant="huggingface"
 
 
 
 
277
  )
278
  add_channels_btn = gr.Button(
279
  "βž• Add", size="sm", scale=0, variant="primary"
@@ -291,6 +296,7 @@ with gr.Blocks() as demo:
291
  outputs=[refresh_status, channel_radio],
292
  )
293
 
 
294
  add_channels_btn.click(close_component, outputs=[my_sidebar]).then(
295
  show_component, outputs=[add_channel_modal]
296
  )
 
13
  from modules.answerer import answer_query, LLMAnswer, VideoItem, build_video_html
14
  from dotenv import load_dotenv
15
 
16
+ from youtube_sync import sync_channels_from_youtube
17
+
18
  load_dotenv()
19
 
20
 
 
60
  return enable_component()
61
 
62
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
63
  def index_channels(channel_urls: str):
64
  yield "saving ...", gr.update(), gr.update()
65
  yt_api_key = os.environ["YOUTUBE_API_KEY"]
66
+
67
  urls = [u.strip() for u in re.split(r"[\n,]+", channel_urls) if u.strip()]
68
+ total_videos = 0
69
+
70
+ # sync all channels, streaming progress
71
+ for message, videos_count in sync_channels_from_youtube(yt_api_key, urls):
72
+ total_videos += videos_count # accumulate actual number of videos indexed
73
+ yield message, gr.update(), gr.update()
74
+
75
+ # final UI update
76
+ yield (
77
+ f"βœ… Indexed {total_videos} videos from {len(urls)} channels.",
78
+ gr.update(choices=list_channels_radio()),
79
+ list_channels_radio(),
80
+ )
81
 
82
 
83
  def refresh_all_channels():
84
  yt_api_key = os.environ["YOUTUBE_API_KEY"]
85
  channels = get_indexed_channels(get_collection())
86
+
87
  if not channels:
88
  return "⚠️ No channels available to refresh.", list_channels_radio()
89
+
90
+ # build list of URLs
91
+ urls = []
92
  for key, val in channels.items():
93
  url = val.get("channel_url") if isinstance(val, dict) else key
94
  if url:
95
+ urls.append(url)
96
+
97
+ # re-index all at once
98
+ total_videos = sync_channels_from_youtube(yt_api_key, urls)
99
+
100
  return (
101
+ f"πŸ”„ Refreshed {len(urls)} channels, re-indexed {total_videos} videos.",
102
  list_channels_radio(),
103
  )
104
 
 
129
  # query your collection/db instead of YouTube API
130
  collection = get_collection()
131
  results = collection.get(
132
+ where={"channel_id": channel_id}, include=["documents", "metadatas"]
 
133
  )
134
 
135
  if not results or not results.get("metadatas"):
 
171
  return html
172
 
173
 
 
174
  # Delete a channel
175
  # -------------------------------
176
  def delete_channel(channel_url: str):
 
245
  # Sidebar
246
  with gr.Sidebar() as my_sidebar:
247
  gr.Markdown("### πŸ“Ί Channels")
248
+ channel_list_values = list_channels_radio()
249
+ channel_list_state = gr.State(channel_list_values)
250
 
251
  no_channels_message = gr.Markdown(
252
  "⚠️ **No channels available.**",
253
+ visible=False if channel_list_values else True,
254
  )
255
  channel_radio = gr.Radio(
256
+ choices=channel_list_values,
257
  label="Select a Channel",
258
+ visible=True if channel_list_values else False,
259
  )
260
 
261
  with gr.Row():
 
266
  variant="secondary",
267
  interactive=False,
268
  )
269
+ refresh_btn = gr.Button(
270
+ "πŸ”„Refresh",
271
+ size="sm",
272
+ scale=0,
273
+ variant="huggingface",
274
+ visible=False,
275
+ )
276
  refresh_all_btn = gr.Button(
277
+ "πŸ”„Refresh",
278
+ size="sm",
279
+ scale=0,
280
+ variant="huggingface",
281
+ visible=False,
282
  )
283
  add_channels_btn = gr.Button(
284
  "βž• Add", size="sm", scale=0, variant="primary"
 
296
  outputs=[refresh_status, channel_radio],
297
  )
298
 
299
+ refresh_btn.click(fn=list_channels_radio, outputs=[channel_radio])
300
  add_channels_btn.click(close_component, outputs=[my_sidebar]).then(
301
  show_component, outputs=[add_channel_modal]
302
  )
modules/collector.py CHANGED
@@ -15,11 +15,11 @@ def fetch_all_channel_videos(api_key: str, channel_url: str, max_results_per_cal
15
 
16
  final_videos = []
17
  for videos in fetch_channel_videos_by_id(api_key, channel_id, max_results_per_call):
18
- final_videos.extend(videos) # extend instead of append
19
  print("Fetched", len(final_videos))
20
- yield (f"Fetched {len(final_videos)}", final_videos)
21
 
22
- yield (f"Fetched {len(final_videos)}", final_videos)
23
 
24
 
25
  def fetch_channel_videos_by_id(api_key: str, channel_id: str, max_results=50):
 
15
 
16
  final_videos = []
17
  for videos in fetch_channel_videos_by_id(api_key, channel_id, max_results_per_call):
18
+ final_videos.extend(videos)
19
  print("Fetched", len(final_videos))
20
+ yield (f"Fetched {len(final_videos)}", videos) # <-- only yield the *new* batch
21
 
22
+ yield (f"Fetched {len(final_videos)}", []) # final "summary"
23
 
24
 
25
  def fetch_channel_videos_by_id(api_key: str, channel_id: str, max_results=50):
modules/indexer.py CHANGED
@@ -2,7 +2,7 @@
2
  from typing import Dict, List
3
  from openai import OpenAI
4
 
5
- def index_videos(videos: List[Dict], collection, channel_url: str, batch_size: int = 20):
6
  client = OpenAI()
7
 
8
  total = len(videos)
@@ -11,7 +11,10 @@ def index_videos(videos: List[Dict], collection, channel_url: str, batch_size: i
11
  # Split into batches
12
  for start in range(0, total, batch_size):
13
  batch = videos[start:start + batch_size]
14
- print(f"[INDEX] Processing batch {start+1} β†’ {start+len(batch)} of {total}")
 
 
 
15
 
16
  # Prepare text inputs
17
  texts = [f"{vid.get('title', '')} - {vid.get('description', '')}" for vid in batch]
@@ -49,6 +52,8 @@ def index_videos(videos: List[Dict], collection, channel_url: str, batch_size: i
49
  ids=ids,
50
  )
51
 
52
- print(f"[INDEX] βœ… Indexed {len(batch)} videos (total so far: {start+len(batch)}/{total})")
53
 
54
  print(f"[INDEX] πŸŽ‰ Finished indexing {total} videos for channel={channel_url}")
 
 
 
2
  from typing import Dict, List
3
  from openai import OpenAI
4
 
5
+ def index_videos(videos: List[Dict], collection, channel_url: str, batch_size: int = 50):
6
  client = OpenAI()
7
 
8
  total = len(videos)
 
11
  # Split into batches
12
  for start in range(0, total, batch_size):
13
  batch = videos[start:start + batch_size]
14
+ end = start + len(batch)
15
+ percent = round((end / total) * 100, 1)
16
+
17
+ print(f"[INDEX] Processing batch {start+1} β†’ {end} of {total} β€” {percent}%")
18
 
19
  # Prepare text inputs
20
  texts = [f"{vid.get('title', '')} - {vid.get('description', '')}" for vid in batch]
 
52
  ids=ids,
53
  )
54
 
55
+ print(f"[INDEX] βœ… Indexed {len(batch)} videos (total so far: {end}/{total} β€” {percent}%)")
56
 
57
  print(f"[INDEX] πŸŽ‰ Finished indexing {total} videos for channel={channel_url}")
58
+ return total
59
+
tests/test_num_records_in_channel.py ADDED
@@ -0,0 +1,28 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # modules/test_utils.py
2
+ import asyncio
3
+ import time
4
+ from typing import Optional
5
+ from modules.db import get_collection
6
+
7
+
8
+ def count_records_for_channel(collection, channel_url: str) -> int:
9
+ """
10
+ Returns how many records are loaded in a collection for the given channel_url.
11
+ """
12
+ if not channel_url:
13
+ raise ValueError("channel_url must be provided")
14
+
15
+ results = collection.get(
16
+ where={"channel_url": channel_url},
17
+ include=[]
18
+ )
19
+ count = len(results["ids"])
20
+ print(f"[TEST] Channel '{channel_url}' has {count} records in collection.")
21
+ return count
22
+
23
+
24
+ if __name__ == "__main__":
25
+ collection = get_collection()
26
+ while True:
27
+ count_records_for_channel(collection, "https://www.youtube.com/@SriYadugiriYathirajaMutt")
28
+ time.sleep(1)
youtube_sync.py ADDED
@@ -0,0 +1,78 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import threading
2
+ import gradio as gr
3
+ from concurrent.futures import ThreadPoolExecutor, as_completed
4
+
5
+ from modules.collector import fetch_all_channel_videos
6
+ from modules.db import get_collection
7
+ from modules.indexer import index_videos
8
+
9
+ # global stop signal
10
+ stop_event = threading.Event()
11
+ MAX_BATCHES = 200 # safety cutoff
12
+
13
+ def stop_sync():
14
+ """External call to stop the sync process."""
15
+ stop_event.set()
16
+
17
+ def sync_channels_from_youtube(api_key, channel_urls: list, progress: gr.Progress = None):
18
+ """
19
+ Sync multiple channels, yielding (progress_message, videos_indexed_in_batch)
20
+ """
21
+ global stop_event
22
+ stop_event.clear()
23
+
24
+ total_channels = len(channel_urls)
25
+ total_videos = 0
26
+
27
+ for idx, channel_url in enumerate(channel_urls, 1):
28
+ if stop_event.is_set():
29
+ yield f"πŸ›‘ Stopped before processing channel: {channel_url}", 0
30
+ break
31
+
32
+ yield f"πŸ”„ Syncing {channel_url} ({idx}/{total_channels})", 0
33
+
34
+ # stream video-level progress from inner generator
35
+ for update_message, batch_count in _refresh_single_channel(api_key, channel_url, progress):
36
+ total_videos += batch_count
37
+ yield update_message, batch_count
38
+
39
+ yield f"βœ… Finished syncing. Total channels: {total_channels}, total videos: {total_videos}", 0
40
+
41
+
42
+ def _refresh_single_channel(api_key, channel_url, progress):
43
+ # fetch all batches first
44
+ fetched_batches = list(fetch_all_channel_videos(api_key, channel_url))
45
+ all_videos = [v | {"channel_url": channel_url} for _, batch in fetched_batches for v in batch]
46
+ total_videos = len(all_videos)
47
+
48
+ if total_videos == 0:
49
+ yield f"{channel_url}: No videos found", 0
50
+ return
51
+
52
+ with ThreadPoolExecutor(max_workers=4) as executor:
53
+ futures = [
54
+ executor.submit(index_videos, batch, get_collection(), channel_url=channel_url)
55
+ for _, batch in fetched_batches
56
+ ]
57
+
58
+ completed_videos = 0
59
+ for f in as_completed(futures):
60
+ if stop_event.is_set():
61
+ yield "πŸ›‘ Stop requested during indexing stage", completed_videos
62
+ break
63
+
64
+ try:
65
+ indexed_count = f.result()
66
+ if indexed_count is None:
67
+ indexed_count = len(all_videos) # fallback if index_videos doesn't return
68
+ except Exception as e:
69
+ indexed_count = 0
70
+ yield f"⚠️ Error indexing {channel_url}: {e}", completed_videos
71
+
72
+ completed_videos += indexed_count
73
+ pct = 100.0 * completed_videos / max(1, total_videos)
74
+
75
+ if progress:
76
+ progress(completed_videos / total_videos)
77
+
78
+ yield f"{channel_url}: Indexed {completed_videos}/{total_videos} videos β€” {pct:.1f}%", completed_videos