|
import re |
|
import asyncio |
|
import json |
|
import os |
|
import traceback |
|
from pyppeteer import launch |
|
from bs4 import BeautifulSoup, NavigableString |
|
from ai_config_faiss import get_ai_assistant |
|
from video_utils import get_youtube_video, generate_clips |
|
from typing import Dict, List, Set, Optional |
|
from dataclasses import dataclass, asdict |
|
import logging |
|
|
|
|
|
os.environ["TOKENIZERS_PARALLELISM"] = "false" |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
logging.getLogger("moviepy").setLevel(logging.WARNING) |
|
|
|
|
|
CACHE_DIR = "cache/" |
|
DB_METADATA_FILE = os.path.join(CACHE_DIR, "db_metadata.json") |
|
SUBJECTS = [ |
|
" 5G ", " AI ", " Innovation ", " Network ", " Enterprise ", " Open RAN ", |
|
" TechCo ", " B2B ", " API ", " Infrastructure ", " Connectivity " |
|
] |
|
|
|
os.makedirs(CACHE_DIR, exist_ok=True) |
|
|
|
|
|
@dataclass |
|
class TranscriptSegment: |
|
metadata: Dict[str, Optional[str]] |
|
text: str |
|
|
|
|
|
@dataclass |
|
class VideoInfo: |
|
metadata: Dict[str, Optional[str]] |
|
transcript: List[TranscriptSegment] |
|
|
|
|
|
async def get_client_rendered_content(url: str) -> str: |
|
browser = None |
|
try: |
|
browser = await launch() |
|
page = await browser.newPage() |
|
await page.goto(url, {'waitUntil': 'networkidle0', 'timeout': 60000}) |
|
await asyncio.sleep(5) |
|
return await page.content() |
|
except Exception as e: |
|
logger.error(f"Error fetching content for {url}: {str(e)}") |
|
raise |
|
finally: |
|
if browser: |
|
await browser.close() |
|
|
|
|
|
def extract_text_with_br(element): |
|
result = ['<br><br>'] |
|
for child in element.descendants: |
|
if isinstance(child, NavigableString): |
|
result.append(child.strip()) |
|
elif child.name == 'br': |
|
result.append('<br>') |
|
return ''.join(result).strip() |
|
|
|
|
|
def extract_info(html_content: str) -> VideoInfo: |
|
try: |
|
soup = BeautifulSoup(html_content, 'html.parser') |
|
title = soup.title.string.strip() if soup.title else None |
|
date_elem = soup.find('p', class_='content-date') |
|
date = date_elem.find('span', class_='ng-binding').text.strip() if date_elem else None |
|
youtube_iframe = soup.find('iframe', src=lambda x: x and 'youtube.com' in x) |
|
youtube_url = youtube_iframe['src'] if youtube_iframe else None |
|
youtube_id = re.search(r'youtube\.com/embed/([^?]+)', youtube_url).group(1) if youtube_url else None |
|
if get_youtube_video(CACHE_DIR, youtube_id): |
|
transcript_elem = soup.find(id='transcript0') |
|
transcript = extract_text_with_br(transcript_elem) if transcript_elem else None |
|
return VideoInfo( |
|
metadata={'title': title, 'date': date, 'youtube_id': youtube_id}, |
|
transcript=parse_transcript(transcript) if transcript else [] |
|
) |
|
else: |
|
return None |
|
except Exception as e: |
|
logger.error(f"Error extracting information: {str(e)}") |
|
raise |
|
|
|
|
|
def read_file(filename: str) -> Optional[str]: |
|
try: |
|
if os.path.exists(filename): |
|
with open(filename, 'r', encoding='utf-8') as f: |
|
return f.read() |
|
return None |
|
except Exception as e: |
|
logger.error(f"Error reading file {filename}: {str(e)}") |
|
raise |
|
|
|
|
|
def extract_subject_info(text: str) -> List[str]: |
|
return [subject for subject in SUBJECTS if subject.lower() in text.lower()] |
|
|
|
|
|
def extract_speaker_info(segment: str) -> Optional[Dict[str, Optional[str]]]: |
|
pattern = r'<br><br>(?:(?P<speaker>[^,(]+?)(?:,\s*(?P<company>[^(]+?))?)?\s*\((?P<timestamp>\d{2}:\d{2}:\d{2}|\d{2}:\d{2})\):<br>' |
|
|
|
match = re.match(pattern, segment) |
|
return {key: value.strip() if value else None for key, value in match.groupdict().items()} if match else None |
|
|
|
|
|
def parse_transcript(content: str) -> List[TranscriptSegment]: |
|
parsed_segments = [] |
|
saved_info = None |
|
|
|
segments = [segment.strip() for segment in re.split(r'(<br><br>.*?\((?:\d{2}:)?\d{2}:\d{2}\):<br>)', |
|
content) if segment.strip()] |
|
|
|
for i, segment in enumerate(segments): |
|
speaker_info = extract_speaker_info(segment) |
|
if speaker_info: |
|
if speaker_info['speaker']: |
|
if saved_info: |
|
text = segments[i-1] if i > 0 else "" |
|
parsed_segments.append(TranscriptSegment( |
|
metadata={ |
|
'speaker': saved_info['speaker'], |
|
'company': saved_info['company'], |
|
'start_timestamp': saved_info['timestamp'], |
|
'end_timestamp': speaker_info['timestamp'], |
|
'subjects': extract_subject_info(text) |
|
}, |
|
text=text |
|
)) |
|
saved_info = speaker_info |
|
if not saved_info['company']: |
|
saved_info['company'] = "Unknown" |
|
else: |
|
if saved_info: |
|
text = segments[i-1] if i > 0 else "" |
|
parsed_segments.append(TranscriptSegment( |
|
metadata={ |
|
'speaker': saved_info['speaker'], |
|
'company': saved_info['company'], |
|
'start_timestamp': saved_info['timestamp'], |
|
'end_timestamp': speaker_info['timestamp'], |
|
'subjects': extract_subject_info(text) |
|
}, |
|
text=text |
|
)) |
|
saved_info['timestamp'] = speaker_info['timestamp'] |
|
elif saved_info: |
|
continue |
|
|
|
if saved_info: |
|
text = segments[-1] |
|
parsed_segments.append(TranscriptSegment( |
|
metadata={ |
|
'speaker': saved_info['speaker'], |
|
'company': saved_info['company'], |
|
'start_timestamp': saved_info['timestamp'], |
|
'end_timestamp': "00:00:00", |
|
'subjects': extract_subject_info(text) |
|
}, |
|
text=text |
|
)) |
|
|
|
return parsed_segments |
|
|
|
|
|
def get_cached_filename(url: str) -> str: |
|
return f"{CACHE_DIR}cached_{url.replace('://', '_').replace('/', '_')}" |
|
|
|
|
|
async def process_url(url: str) -> Optional[VideoInfo]: |
|
try: |
|
cached_filename = get_cached_filename(url) |
|
html_filename = f"{cached_filename}.html" |
|
json_filename = f"{cached_filename}.json" |
|
|
|
if os.path.exists(json_filename): |
|
logger.info(f"Using cached JSON for {url}") |
|
with open(json_filename, 'r', encoding='utf-8') as f: |
|
data = json.load(f) |
|
return VideoInfo( |
|
metadata=data['metadata'], |
|
transcript=[TranscriptSegment(**segment) for segment in data['transcript']] |
|
) |
|
|
|
if os.path.exists(html_filename): |
|
logger.info(f"Using cached HTML for {url}") |
|
content = read_file(html_filename) |
|
else: |
|
logger.info(f"Fetching content from web for {url}") |
|
content = await get_client_rendered_content(url) |
|
with open(html_filename, 'w', encoding='utf-8') as f: |
|
f.write(content) |
|
|
|
info = extract_info(content) |
|
|
|
if info.transcript: |
|
logger.info(f"Generating clips for {url}") |
|
info_dict = asdict(info) |
|
info_dict['transcript'] = generate_clips(CACHE_DIR, info_dict) |
|
info = VideoInfo( |
|
metadata=info_dict['metadata'], |
|
transcript=[TranscriptSegment(**segment) for segment in info_dict['transcript']] |
|
) |
|
|
|
with open(json_filename, 'w', encoding='utf-8') as f: |
|
json.dump(asdict(info), f, ensure_ascii=False, indent=4) |
|
|
|
logger.info(f"Information extracted and saved to {json_filename}") |
|
else: |
|
logger.warning(f"No transcript found for {url}") |
|
|
|
return info |
|
|
|
except Exception: |
|
logger.error(f"Error processing URL {url}:\n{traceback.format_exc()}") |
|
return None |
|
|
|
|
|
async def process_urls(urls: List[str]) -> List[Optional[VideoInfo]]: |
|
return await asyncio.gather(*[process_url(url) for url in urls]) |
|
|
|
|
|
def db_save_metadata_sets(processed_urls: Set[str], speakers: Set[str], |
|
companies: Dict[str, Set[str]], |
|
sentiments: Set[str], subjects: Set[str]): |
|
metadata = { |
|
'processed_urls': list(processed_urls), |
|
'speakers': list(speakers), |
|
'companies': {company: list(speakers) for company, speakers in companies.items()}, |
|
'sentiments': list(sentiments), |
|
'subjects': list(subjects) |
|
} |
|
|
|
with open(DB_METADATA_FILE, 'w') as f: |
|
json.dump(metadata, f, indent=2) |
|
|
|
|
|
def db_load_metadata_sets() -> tuple: |
|
if os.path.exists(DB_METADATA_FILE): |
|
with open(DB_METADATA_FILE, 'r') as f: |
|
metadata = json.load(f) |
|
|
|
return ( |
|
set(metadata.get('processed_urls', [])), |
|
set(metadata.get('speakers', [])), |
|
{company: set(speakers) for company, speakers in metadata.get('companies', {}).items()}, |
|
set(metadata.get('sentiments', [])), |
|
set(metadata.get('subjects', SUBJECTS)) |
|
) |
|
|
|
return set(), set(), {}, set(), set(SUBJECTS) |
|
|
|
|
|
async def main(): |
|
assistant = get_ai_assistant() |
|
url_file = "dsp-urls.txt" |
|
|
|
if not os.path.exists(url_file): |
|
logger.error(f"Error: {url_file} not found.") |
|
return |
|
|
|
processed_urls, speakers, companies, sentiments, subjects = db_load_metadata_sets() |
|
|
|
with open(url_file, 'r') as f: |
|
urls = [line.strip() for line in f if line.strip()] |
|
|
|
total_urls = len(urls) |
|
for i, url in enumerate(urls, 1): |
|
if url in processed_urls: |
|
logger.info(f"[{i}/{total_urls}] {url} already processed") |
|
continue |
|
|
|
logger.info(f"[{i}/{total_urls}] Processing {url}") |
|
info = await process_url(url) |
|
if info is None: |
|
logger.warning(f"[{i}/{total_urls}] Failed to process {url}") |
|
continue |
|
|
|
for entry in info.transcript: |
|
metadata = {**info.metadata, **entry.metadata} |
|
company = metadata.get('company') |
|
speaker = metadata.get('speaker') |
|
entry_subjects = metadata.get('subjects', []) |
|
|
|
if speaker: |
|
speakers.add(speaker) |
|
subjects.update(entry_subjects) |
|
|
|
assistant.add_to_knowledge_base(entry.text, data_type='text', metadata=metadata.copy()) |
|
|
|
if company and speaker: |
|
companies.setdefault(company, set()).add(speaker) |
|
|
|
processed_urls.add(url) |
|
logger.info(f"[{i}/{total_urls}] Added new url: {url}") |
|
|
|
db_save_metadata_sets(processed_urls, speakers, companies, sentiments, subjects) |
|
assistant.save() |
|
|
|
logger.info("Processing complete. Check logs for any errors.") |
|
|
|
|
|
if __name__ == "__main__": |
|
asyncio.run(main()) |
|
|