File size: 11,243 Bytes
26df9fd 47681bb 0540b53 03ec9cb 2127ae4 26df9fd 2127ae4 47681bb 2127ae4 26df9fd 810b5d2 2127ae4 26df9fd 0540b53 2127ae4 0540b53 26df9fd 2127ae4 26df9fd 2127ae4 26df9fd 2127ae4 26df9fd 2127ae4 26df9fd 47681bb 2127ae4 26df9fd 47681bb 26df9fd 2127ae4 03ec9cb 26df9fd 2127ae4 26df9fd 2127ae4 26df9fd 2127ae4 810b5d2 2127ae4 26df9fd 2127ae4 26df9fd 2127ae4 26df9fd 0540b53 2127ae4 0540b53 2127ae4 0540b53 2127ae4 47681bb 2127ae4 47681bb 2127ae4 47681bb 2127ae4 47681bb 2127ae4 26df9fd 2127ae4 26df9fd 2127ae4 26df9fd 2127ae4 26df9fd 810b5d2 2127ae4 26df9fd 2127ae4 26df9fd 2127ae4 26df9fd 2127ae4 26df9fd 2127ae4 26df9fd 2127ae4 0540b53 03ec9cb 26df9fd 2127ae4 26df9fd 2127ae4 0540b53 26df9fd 2127ae4 26df9fd 2127ae4 26df9fd 2127ae4 26df9fd 2127ae4 26df9fd 2127ae4 0540b53 26df9fd 2127ae4 0540b53 2127ae4 26df9fd 2127ae4 26df9fd 03ec9cb 0540b53 2127ae4 0540b53 26df9fd 2127ae4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 |
import re
import asyncio
import json
import os
import traceback
from pyppeteer import launch
from bs4 import BeautifulSoup, NavigableString
from ai_config_faiss import get_ai_assistant
from video_utils import get_youtube_video, generate_clips
from typing import Dict, List, Set, Optional
from dataclasses import dataclass, asdict
import logging
# Set the TOKENIZERS_PARALLELISM environment variable
os.environ["TOKENIZERS_PARALLELISM"] = "false"
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Configure logging to suppress MoviePy's console output
logging.getLogger("moviepy").setLevel(logging.WARNING)
CACHE_DIR = "cache/"
DB_METADATA_FILE = os.path.join(CACHE_DIR, "db_metadata.json")
SUBJECTS = [
" 5G ", " AI ", " Innovation ", " Network ", " Enterprise ", " Open RAN ",
" TechCo ", " B2B ", " API ", " Infrastructure ", " Connectivity "
]
os.makedirs(CACHE_DIR, exist_ok=True)
@dataclass
class TranscriptSegment:
metadata: Dict[str, Optional[str]]
text: str
@dataclass
class VideoInfo:
metadata: Dict[str, Optional[str]]
transcript: List[TranscriptSegment]
async def get_client_rendered_content(url: str) -> str:
browser = None
try:
browser = await launch()
page = await browser.newPage()
await page.goto(url, {'waitUntil': 'networkidle0', 'timeout': 60000})
await asyncio.sleep(5)
return await page.content()
except Exception as e:
logger.error(f"Error fetching content for {url}: {str(e)}")
raise
finally:
if browser:
await browser.close()
def extract_text_with_br(element):
result = ['<br><br>']
for child in element.descendants:
if isinstance(child, NavigableString):
result.append(child.strip())
elif child.name == 'br':
result.append('<br>')
return ''.join(result).strip()
def extract_info(html_content: str) -> VideoInfo:
try:
soup = BeautifulSoup(html_content, 'html.parser')
title = soup.title.string.strip() if soup.title else None
date_elem = soup.find('p', class_='content-date')
date = date_elem.find('span', class_='ng-binding').text.strip() if date_elem else None
youtube_iframe = soup.find('iframe', src=lambda x: x and 'youtube.com' in x)
youtube_url = youtube_iframe['src'] if youtube_iframe else None
youtube_id = re.search(r'youtube\.com/embed/([^?]+)', youtube_url).group(1) if youtube_url else None
if get_youtube_video(CACHE_DIR, youtube_id):
transcript_elem = soup.find(id='transcript0')
transcript = extract_text_with_br(transcript_elem) if transcript_elem else None
return VideoInfo(
metadata={'title': title, 'date': date, 'youtube_id': youtube_id},
transcript=parse_transcript(transcript) if transcript else []
)
else:
return None
except Exception as e:
logger.error(f"Error extracting information: {str(e)}")
raise
def read_file(filename: str) -> Optional[str]:
try:
if os.path.exists(filename):
with open(filename, 'r', encoding='utf-8') as f:
return f.read()
return None
except Exception as e:
logger.error(f"Error reading file {filename}: {str(e)}")
raise
def extract_subject_info(text: str) -> List[str]:
return [subject for subject in SUBJECTS if subject.lower() in text.lower()]
def extract_speaker_info(segment: str) -> Optional[Dict[str, Optional[str]]]:
pattern = r'<br><br>(?:(?P<speaker>[^,(]+?)(?:,\s*(?P<company>[^(]+?))?)?\s*\((?P<timestamp>\d{2}:\d{2}:\d{2}|\d{2}:\d{2})\):<br>'
match = re.match(pattern, segment)
return {key: value.strip() if value else None for key, value in match.groupdict().items()} if match else None
def parse_transcript(content: str) -> List[TranscriptSegment]:
parsed_segments = []
saved_info = None
segments = [segment.strip() for segment in re.split(r'(<br><br>.*?\((?:\d{2}:)?\d{2}:\d{2}\):<br>)',
content) if segment.strip()]
for i, segment in enumerate(segments):
speaker_info = extract_speaker_info(segment)
if speaker_info:
if speaker_info['speaker']:
if saved_info:
text = segments[i-1] if i > 0 else ""
parsed_segments.append(TranscriptSegment(
metadata={
'speaker': saved_info['speaker'],
'company': saved_info['company'],
'start_timestamp': saved_info['timestamp'],
'end_timestamp': speaker_info['timestamp'],
'subjects': extract_subject_info(text)
},
text=text
))
saved_info = speaker_info
if not saved_info['company']:
saved_info['company'] = "Unknown"
else:
if saved_info:
text = segments[i-1] if i > 0 else ""
parsed_segments.append(TranscriptSegment(
metadata={
'speaker': saved_info['speaker'],
'company': saved_info['company'],
'start_timestamp': saved_info['timestamp'],
'end_timestamp': speaker_info['timestamp'],
'subjects': extract_subject_info(text)
},
text=text
))
saved_info['timestamp'] = speaker_info['timestamp']
elif saved_info:
continue
if saved_info:
text = segments[-1]
parsed_segments.append(TranscriptSegment(
metadata={
'speaker': saved_info['speaker'],
'company': saved_info['company'],
'start_timestamp': saved_info['timestamp'],
'end_timestamp': "00:00:00",
'subjects': extract_subject_info(text)
},
text=text
))
return parsed_segments
def get_cached_filename(url: str) -> str:
return f"{CACHE_DIR}cached_{url.replace('://', '_').replace('/', '_')}"
async def process_url(url: str) -> Optional[VideoInfo]:
try:
cached_filename = get_cached_filename(url)
html_filename = f"{cached_filename}.html"
json_filename = f"{cached_filename}.json"
if os.path.exists(json_filename):
logger.info(f"Using cached JSON for {url}")
with open(json_filename, 'r', encoding='utf-8') as f:
data = json.load(f)
return VideoInfo(
metadata=data['metadata'],
transcript=[TranscriptSegment(**segment) for segment in data['transcript']]
)
if os.path.exists(html_filename):
logger.info(f"Using cached HTML for {url}")
content = read_file(html_filename)
else:
logger.info(f"Fetching content from web for {url}")
content = await get_client_rendered_content(url)
with open(html_filename, 'w', encoding='utf-8') as f:
f.write(content)
info = extract_info(content)
if info.transcript:
logger.info(f"Generating clips for {url}")
info_dict = asdict(info)
info_dict['transcript'] = generate_clips(CACHE_DIR, info_dict)
info = VideoInfo(
metadata=info_dict['metadata'],
transcript=[TranscriptSegment(**segment) for segment in info_dict['transcript']]
)
with open(json_filename, 'w', encoding='utf-8') as f:
json.dump(asdict(info), f, ensure_ascii=False, indent=4)
logger.info(f"Information extracted and saved to {json_filename}")
else:
logger.warning(f"No transcript found for {url}")
return info
except Exception:
logger.error(f"Error processing URL {url}:\n{traceback.format_exc()}")
return None
async def process_urls(urls: List[str]) -> List[Optional[VideoInfo]]:
return await asyncio.gather(*[process_url(url) for url in urls])
def db_save_metadata_sets(processed_urls: Set[str], speakers: Set[str],
companies: Dict[str, Set[str]],
sentiments: Set[str], subjects: Set[str]):
metadata = {
'processed_urls': list(processed_urls),
'speakers': list(speakers),
'companies': {company: list(speakers) for company, speakers in companies.items()},
'sentiments': list(sentiments),
'subjects': list(subjects)
}
with open(DB_METADATA_FILE, 'w') as f:
json.dump(metadata, f, indent=2)
def db_load_metadata_sets() -> tuple:
if os.path.exists(DB_METADATA_FILE):
with open(DB_METADATA_FILE, 'r') as f:
metadata = json.load(f)
return (
set(metadata.get('processed_urls', [])),
set(metadata.get('speakers', [])),
{company: set(speakers) for company, speakers in metadata.get('companies', {}).items()},
set(metadata.get('sentiments', [])),
set(metadata.get('subjects', SUBJECTS))
)
return set(), set(), {}, set(), set(SUBJECTS)
async def main():
assistant = get_ai_assistant()
url_file = "dsp-urls.txt"
if not os.path.exists(url_file):
logger.error(f"Error: {url_file} not found.")
return
processed_urls, speakers, companies, sentiments, subjects = db_load_metadata_sets()
with open(url_file, 'r') as f:
urls = [line.strip() for line in f if line.strip()]
total_urls = len(urls)
for i, url in enumerate(urls, 1):
if url in processed_urls:
logger.info(f"[{i}/{total_urls}] {url} already processed")
continue
logger.info(f"[{i}/{total_urls}] Processing {url}")
info = await process_url(url)
if info is None:
logger.warning(f"[{i}/{total_urls}] Failed to process {url}")
continue
for entry in info.transcript:
metadata = {**info.metadata, **entry.metadata}
company = metadata.get('company')
speaker = metadata.get('speaker')
entry_subjects = metadata.get('subjects', [])
if speaker:
speakers.add(speaker)
subjects.update(entry_subjects)
assistant.add_to_knowledge_base(entry.text, data_type='text', metadata=metadata.copy())
if company and speaker:
companies.setdefault(company, set()).add(speaker)
processed_urls.add(url)
logger.info(f"[{i}/{total_urls}] Added new url: {url}")
db_save_metadata_sets(processed_urls, speakers, companies, sentiments, subjects)
assistant.save()
logger.info("Processing complete. Check logs for any errors.")
if __name__ == "__main__":
asyncio.run(main())
|