aiws / search_engine.py
fikird
Enhance content processing with better summarization and topic extraction
8e83c5f
from typing import Dict, List, Any
import requests
from bs4 import BeautifulSoup
from transformers import pipeline
from langchain_community.embeddings import HuggingFaceEmbeddings
import time
import json
import os
from urllib.parse import urlparse, quote_plus
import logging
import random
logger = logging.getLogger(__name__)
class SearchResult:
def __init__(self, title: str, link: str, snippet: str):
self.title = title
self.link = link
self.snippet = snippet
class ModelManager:
"""Manages different AI models for specific tasks"""
def __init__(self):
self.device = "cpu"
self.models = {}
self.load_models()
def load_models(self):
# Use smaller models for CPU deployment
self.models['summarizer'] = pipeline(
"summarization",
model="facebook/bart-base",
device=self.device
)
self.models['embeddings'] = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2",
model_kwargs={"device": self.device}
)
class ContentProcessor:
"""Processes and analyzes different types of content"""
def __init__(self):
self.model_manager = ModelManager()
def clean_text(self, text: str) -> str:
"""Clean and normalize text content"""
# Remove extra whitespace and normalize
lines = [line.strip() for line in text.splitlines()]
text = ' '.join(line for line in lines if line)
# Remove redundant spaces
text = ' '.join(text.split())
# Remove common navigation elements
nav_patterns = [
"skip to content",
"search",
"menu",
"navigation",
"subscribe",
"sign in",
"log in"
]
for pattern in nav_patterns:
text = text.replace(pattern, "")
return text
def extract_key_points(self, text: str, max_points: int = 5) -> List[str]:
"""Extract key points from text using the summarizer"""
try:
# Split text into chunks of ~1000 characters
chunks = [text[i:i + 1000] for i in range(0, len(text), 1000)]
all_points = []
for chunk in chunks[:3]: # Process first 3 chunks only
summary = self.model_manager.models['summarizer'](
chunk,
max_length=100,
min_length=30,
do_sample=False
)[0]['summary_text']
# Split into sentences and add as points
sentences = [s.strip() for s in summary.split('.') if s.strip()]
all_points.extend(sentences)
# Return unique points, limited to max_points
unique_points = list(dict.fromkeys(all_points))
return unique_points[:max_points]
except Exception as e:
logger.error(f"Error extracting key points: {str(e)}")
return []
def process_content(self, content: str) -> Dict:
"""Process content and generate insights"""
try:
# Clean the text
cleaned_text = self.clean_text(content)
# Extract key points
key_points = self.extract_key_points(cleaned_text)
# Generate a concise summary
summary = self.model_manager.models['summarizer'](
cleaned_text[:1024],
max_length=150,
min_length=50,
do_sample=False
)[0]['summary_text']
# Extract potential topics/keywords
topics = []
common_topics = [
"quantum computing", "quantum processors", "quantum bits",
"quantum algorithms", "quantum supremacy", "quantum advantage",
"error correction", "quantum hardware", "quantum software",
"quantum research", "quantum applications"
]
for topic in common_topics:
if topic.lower() in cleaned_text.lower():
topics.append(topic)
return {
'summary': summary,
'key_points': key_points,
'topics': topics[:5], # Limit to top 5 topics
'content': cleaned_text
}
except Exception as e:
return {
'summary': f"Error processing content: {str(e)}",
'key_points': [],
'topics': [],
'content': content
}
class WebSearchEngine:
"""Main search engine class"""
def __init__(self):
self.processor = ContentProcessor()
self.session = requests.Session()
self.request_delay = 2.0
self.last_request_time = 0
self.max_retries = 3
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
def safe_get(self, url: str, max_retries: int = 3) -> requests.Response:
"""Make a GET request with retries and error handling"""
for i in range(max_retries):
try:
# Add delay between requests
current_time = time.time()
time_since_last = current_time - self.last_request_time
if time_since_last < self.request_delay:
time.sleep(self.request_delay - time_since_last + random.uniform(0.5, 1.5))
response = self.session.get(url, headers=self.headers, timeout=10)
self.last_request_time = time.time()
if response.status_code == 200:
return response
elif response.status_code == 429: # Rate limit
wait_time = (i + 1) * 5
time.sleep(wait_time)
continue
else:
response.raise_for_status()
except Exception as e:
if i == max_retries - 1:
raise
time.sleep((i + 1) * 2)
raise Exception(f"Failed to fetch URL after {max_retries} attempts")
def is_valid_url(self, url: str) -> bool:
"""Check if URL is valid for crawling"""
try:
parsed = urlparse(url)
return bool(parsed.netloc and parsed.scheme)
except:
return False
def get_metadata(self, soup: BeautifulSoup) -> Dict:
"""Extract metadata from page"""
title = soup.title.string if soup.title else "No title"
description = ""
if soup.find("meta", attrs={"name": "description"}):
description = soup.find("meta", attrs={"name": "description"}).get("content", "")
return {
'title': title,
'description': description
}
def process_url(self, url: str) -> Dict:
"""Process a single URL"""
if not self.is_valid_url(url):
return {'error': f"Invalid URL: {url}"}
try:
response = self.safe_get(url)
soup = BeautifulSoup(response.text, 'lxml')
# Extract text content
for script in soup(["script", "style"]):
script.decompose()
text = soup.get_text()
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
content = ' '.join(chunk for chunk in chunks if chunk)
# Get metadata
metadata = self.get_metadata(soup)
# Process content
processed = self.processor.process_content(content)
return {
'url': url,
'title': metadata['title'],
'description': metadata['description'],
'summary': processed['summary'],
'key_points': processed['key_points'],
'topics': processed['topics'],
'content': processed['content']
}
except Exception as e:
return {'error': f"Error processing {url}: {str(e)}"}
def search_duckduckgo(self, query: str, max_results: int = 5) -> List[Dict]:
"""Search DuckDuckGo and parse HTML results"""
search_results = []
try:
# Encode query for URL
encoded_query = quote_plus(query)
# DuckDuckGo HTML search URL
search_url = f'https://html.duckduckgo.com/html/?q={encoded_query}'
# Get search results page
response = self.safe_get(search_url)
soup = BeautifulSoup(response.text, 'lxml')
# Find all result elements
results = soup.find_all('div', {'class': 'result'})
for result in results[:max_results]:
try:
# Extract link
link_elem = result.find('a', {'class': 'result__a'})
if not link_elem:
continue
link = link_elem.get('href', '')
if not link or not self.is_valid_url(link):
continue
# Extract title
title = link_elem.get_text(strip=True)
# Extract snippet
snippet_elem = result.find('a', {'class': 'result__snippet'})
snippet = snippet_elem.get_text(strip=True) if snippet_elem else ""
search_results.append({
'link': link,
'title': title,
'snippet': snippet
})
# Add delay between processing results
time.sleep(random.uniform(0.2, 0.5))
except Exception as e:
logger.warning(f"Error processing search result: {str(e)}")
continue
return search_results
except Exception as e:
logger.error(f"Error during DuckDuckGo search: {str(e)}")
return []
def search(self, query: str, max_results: int = 5) -> Dict:
"""Perform search and process results"""
try:
# Search using DuckDuckGo HTML
search_results = self.search_duckduckgo(query, max_results)
if not search_results:
return {'error': 'No results found'}
results = []
all_key_points = []
all_topics = set()
for result in search_results:
if 'link' in result:
processed = self.process_url(result['link'])
if 'error' not in processed:
results.append(processed)
# Collect key points and topics
if 'key_points' in processed:
all_key_points.extend(processed['key_points'])
if 'topics' in processed:
all_topics.update(processed.get('topics', []))
time.sleep(random.uniform(0.5, 1.0))
if not results:
return {'error': 'Failed to process any search results'}
# Combine all summaries
all_summaries = " ".join([r['summary'] for r in results if 'summary' in r])
# Generate a meta-summary of all content
meta_summary = self.processor.model_manager.models['summarizer'](
all_summaries[:1024],
max_length=200,
min_length=100,
do_sample=False
)[0]['summary_text']
# Get unique key points
unique_key_points = list(dict.fromkeys(all_key_points))
return {
'results': results,
'insights': {
'summary': meta_summary,
'key_points': unique_key_points[:7], # Top 7 key points
'topics': list(all_topics)[:5] # Top 5 topics
},
'follow_up_questions': [
f"What are the recent breakthroughs in {', '.join(list(all_topics)[:2])}?",
f"How do these developments impact the future of quantum computing?",
f"What are the practical applications of these quantum computing advances?"
]
}
except Exception as e:
return {'error': f"Search failed: {str(e)}"}
# Main search function
def search(query: str, max_results: int = 5) -> Dict:
"""Main search function"""
engine = WebSearchEngine()
return engine.search(query, max_results)