|
""" |
|
Web utilities for search, content extraction, and URL handling. |
|
""" |
|
|
|
import os |
|
import re |
|
import requests |
|
from urllib.parse import urlparse, urljoin |
|
from bs4 import BeautifulSoup |
|
import html2text |
|
from typing import Optional, Dict, Tuple, List |
|
import time |
|
|
|
from tavily import TavilyClient |
|
from config import TAVILY_API_KEY |
|
|
|
|
|
tavily_client = None |
|
if TAVILY_API_KEY: |
|
try: |
|
tavily_client = TavilyClient(api_key=TAVILY_API_KEY) |
|
print("[WebUtils] Tavily client initialized successfully") |
|
except Exception as e: |
|
print(f"[WebUtils] Failed to initialize Tavily client: {e}") |
|
tavily_client = None |
|
else: |
|
print("[WebUtils] Tavily API key not found - web search will be unavailable") |
|
|
|
class WebContentExtractor: |
|
"""Handles web content extraction and processing""" |
|
|
|
@staticmethod |
|
def extract_website_content(url: str) -> str: |
|
"""Extract HTML code and content from a website URL""" |
|
try: |
|
|
|
parsed_url = urlparse(url) |
|
if not parsed_url.scheme: |
|
url = "https://" + url |
|
parsed_url = urlparse(url) |
|
|
|
if not parsed_url.netloc: |
|
return "Error: Invalid URL provided" |
|
|
|
print(f"[WebExtract] Fetching content from: {url}") |
|
|
|
|
|
headers = { |
|
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', |
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8', |
|
'Accept-Language': 'en-US,en;q=0.9', |
|
'Accept-Encoding': 'gzip, deflate, br', |
|
'DNT': '1', |
|
'Connection': 'keep-alive', |
|
'Upgrade-Insecure-Requests': '1', |
|
'Sec-Fetch-Dest': 'document', |
|
'Sec-Fetch-Mode': 'navigate', |
|
'Sec-Fetch-Site': 'none', |
|
'Sec-Fetch-User': '?1', |
|
'Cache-Control': 'max-age=0' |
|
} |
|
|
|
|
|
session = requests.Session() |
|
session.headers.update(headers) |
|
|
|
|
|
max_retries = 3 |
|
for attempt in range(max_retries): |
|
try: |
|
response = session.get(url, timeout=15, allow_redirects=True) |
|
response.raise_for_status() |
|
break |
|
except requests.exceptions.HTTPError as e: |
|
if e.response.status_code == 403 and attempt < max_retries - 1: |
|
|
|
session.headers['User-Agent'] = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' |
|
continue |
|
else: |
|
raise |
|
|
|
|
|
try: |
|
response.encoding = response.apparent_encoding |
|
raw_html = response.text |
|
except: |
|
raw_html = response.content.decode('utf-8', errors='ignore') |
|
|
|
|
|
soup = BeautifulSoup(raw_html, 'html.parser') |
|
|
|
|
|
title = soup.find('title') |
|
title_text = title.get_text().strip() if title else "No title found" |
|
|
|
meta_desc = soup.find('meta', attrs={'name': 'description'}) |
|
description = meta_desc.get('content', '') if meta_desc else "" |
|
|
|
|
|
WebContentExtractor._fix_image_urls(soup, url) |
|
|
|
|
|
content_info = WebContentExtractor._analyze_content(soup) |
|
|
|
|
|
modified_html = str(soup) |
|
|
|
|
|
cleaned_html = WebContentExtractor._clean_html(modified_html) |
|
|
|
|
|
website_content = WebContentExtractor._format_website_analysis( |
|
url, title_text, description, content_info, cleaned_html |
|
) |
|
|
|
return website_content.strip() |
|
|
|
except requests.exceptions.HTTPError as e: |
|
return WebContentExtractor._handle_http_error(e, url) |
|
except requests.exceptions.Timeout: |
|
return "Error: Request timed out. The website may be slow or unavailable." |
|
except requests.exceptions.ConnectionError: |
|
return "Error: Could not connect to the website. Please check your internet connection and the URL." |
|
except requests.exceptions.RequestException as e: |
|
return f"Error accessing website: {str(e)}" |
|
except Exception as e: |
|
return f"Error extracting website content: {str(e)}" |
|
|
|
@staticmethod |
|
def _fix_image_urls(soup: BeautifulSoup, base_url: str): |
|
"""Fix relative image URLs to absolute URLs""" |
|
img_elements = soup.find_all('img') |
|
for img in img_elements: |
|
src = img.get('src', '') |
|
if src: |
|
img['src'] = WebContentExtractor._make_absolute_url(src, base_url) |
|
|
|
|
|
data_src = img.get('data-src', '') |
|
if data_src and not src: |
|
img['src'] = WebContentExtractor._make_absolute_url(data_src, base_url) |
|
|
|
|
|
elements_with_style = soup.find_all(attrs={'style': True}) |
|
for element in elements_with_style: |
|
style_attr = element.get('style', '') |
|
bg_pattern = r'background-image:\s*url\(["\']?([^"\']+)["\']?\)' |
|
matches = re.findall(bg_pattern, style_attr, re.IGNORECASE) |
|
for match in matches: |
|
if match: |
|
absolute_bg = WebContentExtractor._make_absolute_url(match, base_url) |
|
style_attr = style_attr.replace(match, absolute_bg) |
|
element['style'] = style_attr |
|
|
|
|
|
style_elements = soup.find_all('style') |
|
for style in style_elements: |
|
if style.string: |
|
style_content = style.string |
|
bg_pattern = r'background-image:\s*url\(["\']?([^"\']+)["\']?\)' |
|
matches = re.findall(bg_pattern, style_content, re.IGNORECASE) |
|
for match in matches: |
|
if match: |
|
absolute_bg = WebContentExtractor._make_absolute_url(match, base_url) |
|
style_content = style_content.replace(match, absolute_bg) |
|
style.string = style_content |
|
|
|
@staticmethod |
|
def _make_absolute_url(url: str, base_url: str) -> str: |
|
"""Convert relative URL to absolute URL""" |
|
if url.startswith('//'): |
|
return 'https:' + url |
|
elif url.startswith('/'): |
|
return urljoin(base_url, url) |
|
elif not url.startswith(('http://', 'https://')): |
|
return urljoin(base_url, url) |
|
return url |
|
|
|
@staticmethod |
|
def _analyze_content(soup: BeautifulSoup) -> Dict: |
|
"""Analyze website content and structure""" |
|
content_sections = [] |
|
nav_links = [] |
|
images = [] |
|
|
|
|
|
main_selectors = [ |
|
'main', 'article', '.content', '.main-content', '.post-content', |
|
'#content', '#main', '.entry-content', '.post-body' |
|
] |
|
|
|
for selector in main_selectors: |
|
elements = soup.select(selector) |
|
for element in elements: |
|
text = element.get_text().strip() |
|
if len(text) > 100: |
|
content_sections.append(text) |
|
|
|
|
|
nav_elements = soup.find_all(['nav', 'header']) |
|
for nav in nav_elements: |
|
links = nav.find_all('a') |
|
for link in links: |
|
link_text = link.get_text().strip() |
|
link_href = link.get('href', '') |
|
if link_text and link_href: |
|
nav_links.append(f"{link_text}: {link_href}") |
|
|
|
|
|
img_elements = soup.find_all('img') |
|
for img in img_elements: |
|
src = img.get('src', '') |
|
alt = img.get('alt', '') |
|
if src: |
|
images.append({'src': src, 'alt': alt}) |
|
|
|
|
|
working_images = [] |
|
for img in images[:10]: |
|
if WebContentExtractor._test_image_url(img['src']): |
|
working_images.append(img) |
|
|
|
print(f"[WebExtract] Found {len(images)} images, {len(working_images)} working") |
|
|
|
return { |
|
'content_sections': content_sections, |
|
'nav_links': nav_links, |
|
'images': images, |
|
'working_images': working_images, |
|
'script_tags': len(soup.find_all('script')) |
|
} |
|
|
|
@staticmethod |
|
def _test_image_url(img_url: str) -> bool: |
|
"""Test if image URL is accessible""" |
|
try: |
|
test_response = requests.head(img_url, timeout=5, allow_redirects=True) |
|
return test_response.status_code == 200 |
|
except: |
|
return False |
|
|
|
@staticmethod |
|
def _clean_html(html_content: str) -> str: |
|
"""Clean and format HTML for better readability""" |
|
|
|
cleaned = re.sub(r'<!--.*?-->', '', html_content, flags=re.DOTALL) |
|
cleaned = re.sub(r'\s+', ' ', cleaned) |
|
cleaned = re.sub(r'>\s+<', '><', cleaned) |
|
|
|
|
|
if len(cleaned) > 15000: |
|
cleaned = cleaned[:15000] + "\n<!-- ... HTML truncated for length ... -->" |
|
|
|
return cleaned |
|
|
|
@staticmethod |
|
def _format_website_analysis(url: str, title: str, description: str, |
|
content_info: Dict, html: str) -> str: |
|
"""Format comprehensive website analysis""" |
|
working_images = content_info['working_images'] |
|
all_images = content_info['images'] |
|
|
|
content = f""" |
|
WEBSITE REDESIGN - ORIGINAL HTML CODE |
|
===================================== |
|
|
|
URL: {url} |
|
Title: {title} |
|
Description: {description} |
|
|
|
PAGE ANALYSIS: |
|
- Website type: {title.lower()} website |
|
- Content sections: {len(content_info['content_sections'])} |
|
- Navigation links: {len(content_info['nav_links'])} |
|
- Total images: {len(all_images)} |
|
- Working images: {len(working_images)} |
|
- JavaScript complexity: {"High" if content_info['script_tags'] > 10 else "Low to Medium"} |
|
|
|
WORKING IMAGES (use these URLs in your redesign): |
|
{chr(10).join([f"• {img['alt'] or 'Image'} - {img['src']}" for img in working_images[:20]]) if working_images else "No working images found"} |
|
|
|
ALL IMAGES (including potentially broken ones): |
|
{chr(10).join([f"• {img['alt'] or 'Image'} - {img['src']}" for img in all_images[:20]]) if all_images else "No images found"} |
|
|
|
ORIGINAL HTML CODE (use this as the base for redesign): |
|
```html |
|
{html} |
|
``` |
|
|
|
REDESIGN INSTRUCTIONS: |
|
Please redesign this website with a modern, responsive layout while: |
|
1. Preserving all the original content and structure |
|
2. Maintaining the same navigation and functionality |
|
3. Using the original images and their URLs (listed above) |
|
4. Creating a modern, clean design with improved typography and spacing |
|
5. Making it fully responsive for mobile devices |
|
6. Using modern CSS frameworks and best practices |
|
7. Keeping the same semantic structure but with enhanced styling |
|
|
|
IMPORTANT: All image URLs have been converted to absolute URLs and are ready to use. |
|
Preserve these exact image URLs in your redesigned version. |
|
|
|
The HTML code above contains the complete original website structure with all images properly linked. |
|
Use it as your starting point and create a modernized version. |
|
""" |
|
return content |
|
|
|
@staticmethod |
|
def _handle_http_error(error, url: str) -> str: |
|
"""Handle HTTP errors with user-friendly messages""" |
|
status_code = error.response.status_code if hasattr(error, 'response') else 0 |
|
|
|
if status_code == 403: |
|
return f"Error: Website blocked access (403 Forbidden). This website may have anti-bot protection. Try a different website or provide a description instead." |
|
elif status_code == 404: |
|
return f"Error: Website not found (404). Please check the URL and try again." |
|
elif status_code >= 500: |
|
return f"Error: Website server error ({status_code}). Please try again later." |
|
else: |
|
return f"Error accessing website: HTTP {status_code} - {str(error)}" |
|
|
|
class WebSearchEngine: |
|
"""Handles web search operations using Tavily""" |
|
|
|
@staticmethod |
|
def perform_web_search(query: str, max_results: int = 5, |
|
include_domains: Optional[List[str]] = None, |
|
exclude_domains: Optional[List[str]] = None) -> str: |
|
"""Perform web search using Tavily with advanced parameters""" |
|
if not tavily_client: |
|
return "Web search is not available. Please set the TAVILY_API_KEY environment variable." |
|
|
|
try: |
|
print(f"[WebSearch] Searching for: {query}") |
|
|
|
|
|
search_params = { |
|
"search_depth": "advanced", |
|
"max_results": min(max(1, max_results), 20), |
|
"include_answer": True, |
|
"include_raw_content": False |
|
} |
|
|
|
if include_domains: |
|
search_params["include_domains"] = include_domains |
|
if exclude_domains: |
|
search_params["exclude_domains"] = exclude_domains |
|
|
|
|
|
response = tavily_client.search(query, **search_params) |
|
|
|
|
|
search_results = [] |
|
answer = response.get('answer', '') |
|
|
|
if answer: |
|
search_results.append(f"Direct Answer: {answer}\n") |
|
|
|
for result in response.get('results', []): |
|
title = result.get('title', 'No title') |
|
url = result.get('url', 'No URL') |
|
content = result.get('content', 'No content') |
|
score = result.get('score', 0) |
|
|
|
result_text = ( |
|
f"Title: {title}\n" |
|
f"URL: {url}\n" |
|
f"Relevance Score: {score:.2f}\n" |
|
f"Content: {content}\n" |
|
) |
|
search_results.append(result_text) |
|
|
|
if search_results: |
|
final_results = "Web Search Results:\n\n" + "\n---\n".join(search_results) |
|
print(f"[WebSearch] Found {len(search_results)} results") |
|
return final_results |
|
else: |
|
return "No search results found." |
|
|
|
except Exception as e: |
|
error_msg = f"Search error: {str(e)}" |
|
print(f"[WebSearch] Error: {error_msg}") |
|
return error_msg |
|
|
|
@staticmethod |
|
def enhance_query_with_search(query: str, enable_search: bool) -> str: |
|
"""Enhance the query with web search results if search is enabled""" |
|
if not enable_search or not tavily_client: |
|
return query |
|
|
|
print("[WebSearch] Enhancing query with web search") |
|
|
|
|
|
search_results = WebSearchEngine.perform_web_search(query, max_results=3) |
|
|
|
|
|
enhanced_query = f"""Original Query: {query} |
|
|
|
{search_results} |
|
|
|
Please use the search results above to help create the requested application with the most up-to-date information and best practices.""" |
|
|
|
return enhanced_query |
|
|
|
|
|
def parse_repo_or_model_url(url: str) -> Tuple[str, Optional[Dict]]: |
|
"""Parse a URL and detect if it's a GitHub repo, HF Space, or HF Model""" |
|
try: |
|
parsed = urlparse(url.strip()) |
|
netloc = (parsed.netloc or "").lower() |
|
path = (parsed.path or "").strip("/") |
|
|
|
|
|
if ("huggingface.co" in netloc or netloc.endswith("hf.co")) and path.startswith("spaces/"): |
|
parts = path.split("/") |
|
if len(parts) >= 3: |
|
return "hf_space", {"username": parts[1], "project": parts[2]} |
|
|
|
|
|
if ("huggingface.co" in netloc or netloc.endswith("hf.co")) and not path.startswith(("spaces/", "datasets/", "organizations/")): |
|
parts = path.split("/") |
|
if len(parts) >= 2: |
|
repo_id = f"{parts[0]}/{parts[1]}" |
|
return "hf_model", {"repo_id": repo_id} |
|
|
|
|
|
if "github.com" in netloc: |
|
parts = path.split("/") |
|
if len(parts) >= 2: |
|
return "github", {"owner": parts[0], "repo": parts[1]} |
|
|
|
except Exception: |
|
pass |
|
|
|
return "unknown", None |
|
|
|
def check_hf_space_url(url: str) -> Tuple[bool, Optional[str], Optional[str]]: |
|
"""Check if URL is a valid Hugging Face Spaces URL and extract username/project""" |
|
url_pattern = re.compile( |
|
r'^(https?://)?(huggingface\.co|hf\.co)/spaces/([\w-]+)/([\w-]+)$', |
|
re.IGNORECASE |
|
) |
|
|
|
match = url_pattern.match(url.strip()) |
|
if match: |
|
username = match.group(3) |
|
project_name = match.group(4) |
|
return True, username, project_name |
|
return False, None, None |
|
|
|
|
|
web_extractor = WebContentExtractor() |
|
web_search = WebSearchEngine() |
|
|
|
def extract_website_content(url: str) -> str: |
|
return web_extractor.extract_website_content(url) |
|
|
|
def perform_web_search(query: str, max_results: int = 5) -> str: |
|
return web_search.perform_web_search(query, max_results) |
|
|
|
def enhance_query_with_search(query: str, enable_search: bool) -> str: |
|
return web_search.enhance_query_with_search(query, enable_search) |