Spaces:
Build error
Build error
from typing import Dict, List, Any | |
import requests | |
from bs4 import BeautifulSoup | |
from transformers import pipeline | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
import time | |
import json | |
import os | |
from urllib.parse import urlparse, quote_plus | |
import logging | |
import random | |
logger = logging.getLogger(__name__) | |
class SearchResult: | |
def __init__(self, title: str, link: str, snippet: str): | |
self.title = title | |
self.link = link | |
self.snippet = snippet | |
class ModelManager: | |
"""Manages different AI models for specific tasks""" | |
def __init__(self): | |
self.device = "cpu" | |
self.models = {} | |
self.load_models() | |
def load_models(self): | |
# Use smaller models for CPU deployment | |
self.models['summarizer'] = pipeline( | |
"summarization", | |
model="facebook/bart-base", | |
device=self.device | |
) | |
self.models['embeddings'] = HuggingFaceEmbeddings( | |
model_name="sentence-transformers/all-MiniLM-L6-v2", | |
model_kwargs={"device": self.device} | |
) | |
class ContentProcessor: | |
"""Processes and analyzes different types of content""" | |
def __init__(self): | |
self.model_manager = ModelManager() | |
def process_content(self, content: str) -> Dict: | |
"""Process content and generate insights""" | |
try: | |
# Generate summary | |
summary = self.model_manager.models['summarizer']( | |
content[:1024], | |
max_length=100, | |
min_length=30, | |
do_sample=False | |
)[0]['summary_text'] | |
return { | |
'summary': summary, | |
'content': content | |
} | |
except Exception as e: | |
return { | |
'summary': f"Error processing content: {str(e)}", | |
'content': content | |
} | |
class WebSearchEngine: | |
"""Main search engine class""" | |
def __init__(self): | |
self.processor = ContentProcessor() | |
self.session = requests.Session() | |
self.request_delay = 2.0 | |
self.last_request_time = 0 | |
self.max_retries = 3 | |
self.headers = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.5', | |
'DNT': '1', | |
'Connection': 'keep-alive', | |
'Upgrade-Insecure-Requests': '1' | |
} | |
def safe_get(self, url: str, max_retries: int = 3) -> requests.Response: | |
"""Make a GET request with retries and error handling""" | |
for i in range(max_retries): | |
try: | |
# Add delay between requests | |
current_time = time.time() | |
time_since_last = current_time - self.last_request_time | |
if time_since_last < self.request_delay: | |
time.sleep(self.request_delay - time_since_last + random.uniform(0.5, 1.5)) | |
response = self.session.get(url, headers=self.headers, timeout=10) | |
self.last_request_time = time.time() | |
if response.status_code == 200: | |
return response | |
elif response.status_code == 429: # Rate limit | |
wait_time = (i + 1) * 5 | |
time.sleep(wait_time) | |
continue | |
else: | |
response.raise_for_status() | |
except Exception as e: | |
if i == max_retries - 1: | |
raise | |
time.sleep((i + 1) * 2) | |
raise Exception(f"Failed to fetch URL after {max_retries} attempts") | |
def is_valid_url(self, url: str) -> bool: | |
"""Check if URL is valid for crawling""" | |
try: | |
parsed = urlparse(url) | |
return bool(parsed.netloc and parsed.scheme) | |
except: | |
return False | |
def get_metadata(self, soup: BeautifulSoup) -> Dict: | |
"""Extract metadata from page""" | |
title = soup.title.string if soup.title else "No title" | |
description = "" | |
if soup.find("meta", attrs={"name": "description"}): | |
description = soup.find("meta", attrs={"name": "description"}).get("content", "") | |
return { | |
'title': title, | |
'description': description | |
} | |
def process_url(self, url: str) -> Dict: | |
"""Process a single URL""" | |
if not self.is_valid_url(url): | |
return {'error': f"Invalid URL: {url}"} | |
try: | |
response = self.safe_get(url) | |
soup = BeautifulSoup(response.text, 'lxml') | |
# Extract text content | |
for script in soup(["script", "style"]): | |
script.decompose() | |
text = soup.get_text() | |
lines = (line.strip() for line in text.splitlines()) | |
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
content = ' '.join(chunk for chunk in chunks if chunk) | |
# Get metadata | |
metadata = self.get_metadata(soup) | |
# Process content | |
processed = self.processor.process_content(content) | |
return { | |
'url': url, | |
'title': metadata['title'], | |
'description': metadata['description'], | |
'summary': processed['summary'], | |
'content': processed['content'] | |
} | |
except Exception as e: | |
return {'error': f"Error processing {url}: {str(e)}"} | |
def search_duckduckgo(self, query: str, max_results: int = 5) -> List[Dict]: | |
"""Search DuckDuckGo and parse HTML results""" | |
search_results = [] | |
try: | |
# Encode query for URL | |
encoded_query = quote_plus(query) | |
# DuckDuckGo HTML search URL | |
search_url = f'https://html.duckduckgo.com/html/?q={encoded_query}' | |
# Get search results page | |
response = self.safe_get(search_url) | |
soup = BeautifulSoup(response.text, 'lxml') | |
# Find all result elements | |
results = soup.find_all('div', {'class': 'result'}) | |
for result in results[:max_results]: | |
try: | |
# Extract link | |
link_elem = result.find('a', {'class': 'result__a'}) | |
if not link_elem: | |
continue | |
link = link_elem.get('href', '') | |
if not link or not self.is_valid_url(link): | |
continue | |
# Extract title | |
title = link_elem.get_text(strip=True) | |
# Extract snippet | |
snippet_elem = result.find('a', {'class': 'result__snippet'}) | |
snippet = snippet_elem.get_text(strip=True) if snippet_elem else "" | |
search_results.append({ | |
'link': link, | |
'title': title, | |
'snippet': snippet | |
}) | |
# Add delay between processing results | |
time.sleep(random.uniform(0.2, 0.5)) | |
except Exception as e: | |
logger.warning(f"Error processing search result: {str(e)}") | |
continue | |
return search_results | |
except Exception as e: | |
logger.error(f"Error during DuckDuckGo search: {str(e)}") | |
return [] | |
def search(self, query: str, max_results: int = 5) -> Dict: | |
"""Perform search and process results""" | |
try: | |
# Search using DuckDuckGo HTML | |
search_results = self.search_duckduckgo(query, max_results) | |
if not search_results: | |
return {'error': 'No results found'} | |
results = [] | |
for result in search_results: | |
if 'link' in result: | |
processed = self.process_url(result['link']) | |
if 'error' not in processed: | |
results.append(processed) | |
time.sleep(random.uniform(0.5, 1.0)) | |
if not results: | |
return {'error': 'Failed to process any search results'} | |
# Generate insights from results | |
all_content = " ".join([r['summary'] for r in results if 'summary' in r]) | |
return { | |
'results': results, | |
'insights': all_content[:1000] if all_content else "No insights available.", | |
'follow_up_questions': [ | |
f"What are the key differences between {query} and related topics?", | |
f"Can you explain {query} in simple terms?", | |
f"What are the latest developments in {query}?" | |
] | |
} | |
except Exception as e: | |
return {'error': f"Search failed: {str(e)}"} | |
# Main search function | |
def search(query: str, max_results: int = 5) -> Dict: | |
"""Main search function""" | |
engine = WebSearchEngine() | |
return engine.search(query, max_results) | |