Spaces:
Sleeping
Sleeping
| """ | |
| HTML Parser and URL Extractor component for web crawler | |
| """ | |
| import logging | |
| import re | |
| from typing import Dict, List, Set, Tuple, Optional, Any | |
| from urllib.parse import urlparse, urljoin, unquote | |
| from bs4 import BeautifulSoup | |
| import tldextract | |
| import hashlib | |
| import os | |
| from models import URL, Page, Priority, normalize_url | |
| import config | |
| # Configure logging | |
| logging.basicConfig( | |
| level=getattr(logging, config.LOG_LEVEL), | |
| format=config.LOG_FORMAT | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class HTMLParser: | |
| """ | |
| Parses HTML content and extracts URLs and other information | |
| """ | |
| def __init__(self): | |
| """Initialize HTML parser""" | |
| # Compile URL filter regex patterns for efficiency | |
| self.url_filters = [re.compile(pattern) for pattern in config.URL_FILTERS] | |
| def parse(self, page: Page, base_url: Optional[str] = None) -> Tuple[List[str], Dict[str, Any]]: | |
| """ | |
| Parse HTML content and extract URLs and metadata | |
| Args: | |
| page: Page object containing HTML content | |
| base_url: Base URL for resolving relative links (defaults to page URL) | |
| Returns: | |
| Tuple of (extracted URLs, metadata) | |
| """ | |
| if not page or not page.content: | |
| return [], {} | |
| # Use page URL as base URL if not provided | |
| if not base_url: | |
| base_url = page.url | |
| # Parse HTML content | |
| soup = BeautifulSoup(page.content, 'html.parser') | |
| # Extract URLs | |
| urls = self._extract_urls(soup, base_url) | |
| # Extract metadata | |
| metadata = self._extract_metadata(soup) | |
| return urls, metadata | |
| def _extract_urls(self, soup: BeautifulSoup, base_url: str) -> List[str]: | |
| """ | |
| Extract and normalize URLs from HTML content | |
| Args: | |
| soup: BeautifulSoup object | |
| base_url: Base URL for resolving relative links | |
| Returns: | |
| List of normalized URLs | |
| """ | |
| urls = set() | |
| all_urls = set() # Track all URLs before filtering | |
| filtered_urls = set() # Track filtered URLs | |
| logger.debug(f"Extracting URLs from page: {base_url}") | |
| # Extract URLs from <a> tags | |
| for link in soup.find_all('a', href=True): | |
| href = link['href'].strip() | |
| if href and not href.startswith(('#', 'javascript:', 'mailto:', 'tel:')): | |
| # Resolve relative URLs | |
| try: | |
| absolute_url = urljoin(base_url, href) | |
| all_urls.add(absolute_url) | |
| # Normalize URL | |
| normalized_url = normalize_url(absolute_url) | |
| # Apply URL filters | |
| if self._should_allow_url(normalized_url): | |
| urls.add(normalized_url) | |
| else: | |
| filtered_urls.add(normalized_url) | |
| except Exception as e: | |
| logger.debug(f"Error processing URL {href}: {e}") | |
| # Extract URLs from other elements like <iframe>, <frame>, <img>, etc. | |
| for tag_name, attr in [('frame', 'src'), ('iframe', 'src'), ('img', 'src'), | |
| ('link', 'href'), ('script', 'src'), ('area', 'href')]: | |
| for tag in soup.find_all(tag_name, attrs={attr: True}): | |
| url = tag[attr].strip() | |
| if url and not url.startswith(('#', 'javascript:', 'data:', 'mailto:', 'tel:')): | |
| try: | |
| absolute_url = urljoin(base_url, url) | |
| all_urls.add(absolute_url) | |
| normalized_url = normalize_url(absolute_url) | |
| if self._should_allow_url(normalized_url): | |
| urls.add(normalized_url) | |
| else: | |
| filtered_urls.add(normalized_url) | |
| except Exception as e: | |
| logger.debug(f"Error processing URL {url}: {e}") | |
| # Log statistics | |
| logger.debug(f"Found {len(all_urls)} total URLs") | |
| logger.debug(f"Filtered {len(filtered_urls)} URLs") | |
| logger.debug(f"Accepted {len(urls)} URLs") | |
| # Log some example filtered URLs for debugging | |
| if filtered_urls: | |
| sample_filtered = list(filtered_urls)[:5] | |
| logger.debug(f"Sample filtered URLs: {sample_filtered}") | |
| # Return list of unique URLs | |
| return list(urls) | |
| def _should_allow_url(self, url: str) -> bool: | |
| """ | |
| Check if URL should be allowed based on filters | |
| Args: | |
| url: URL to check | |
| Returns: | |
| True if URL should be allowed, False otherwise | |
| """ | |
| try: | |
| parsed = urlparse(url) | |
| # Check scheme | |
| if parsed.scheme not in config.ALLOWED_SCHEMES: | |
| logger.debug(f"URL filtered - invalid scheme: {url}") | |
| return False | |
| # Check domain restrictions | |
| domain = self._extract_domain(url) | |
| # Check allowed domains if set | |
| if config.ALLOWED_DOMAINS and domain not in config.ALLOWED_DOMAINS: | |
| logger.debug(f"URL filtered - domain not allowed: {url} (domain: {domain}, allowed: {config.ALLOWED_DOMAINS})") | |
| return False | |
| # Check excluded domains | |
| if domain in config.EXCLUDED_DOMAINS: | |
| logger.debug(f"URL filtered - domain excluded: {url}") | |
| return False | |
| # Check URL filters | |
| for pattern in self.url_filters: | |
| if pattern.match(url): | |
| logger.debug(f"URL filtered - pattern match: {url}") | |
| return False | |
| return True | |
| except Exception as e: | |
| logger.debug(f"Error checking URL {url}: {e}") | |
| return False | |
| def _extract_metadata(self, soup: BeautifulSoup) -> Dict[str, Any]: | |
| """ | |
| Extract metadata from HTML content | |
| Args: | |
| soup: BeautifulSoup object | |
| Returns: | |
| Dictionary of metadata | |
| """ | |
| metadata = {} | |
| # Extract title | |
| title_tag = soup.find('title') | |
| if title_tag and title_tag.string: | |
| metadata['title'] = title_tag.string.strip() | |
| # Extract meta description | |
| description_tag = soup.find('meta', attrs={'name': 'description'}) | |
| if description_tag and description_tag.get('content'): | |
| metadata['description'] = description_tag['content'].strip() | |
| # Extract meta keywords | |
| keywords_tag = soup.find('meta', attrs={'name': 'keywords'}) | |
| if keywords_tag and keywords_tag.get('content'): | |
| metadata['keywords'] = [k.strip() for k in keywords_tag['content'].split(',')] | |
| # Extract canonical URL | |
| canonical_tag = soup.find('link', attrs={'rel': 'canonical'}) | |
| if canonical_tag and canonical_tag.get('href'): | |
| metadata['canonical_url'] = canonical_tag['href'].strip() | |
| # Extract robots meta | |
| robots_tag = soup.find('meta', attrs={'name': 'robots'}) | |
| if robots_tag and robots_tag.get('content'): | |
| metadata['robots'] = robots_tag['content'].strip() | |
| # Extract Open Graph metadata | |
| og_metadata = {} | |
| for meta_tag in soup.find_all('meta', attrs={'property': re.compile('^og:')}): | |
| if meta_tag.get('content'): | |
| property_name = meta_tag['property'][3:] # Remove 'og:' prefix | |
| og_metadata[property_name] = meta_tag['content'].strip() | |
| if og_metadata: | |
| metadata['open_graph'] = og_metadata | |
| # Extract Twitter Card metadata | |
| twitter_metadata = {} | |
| for meta_tag in soup.find_all('meta', attrs={'name': re.compile('^twitter:')}): | |
| if meta_tag.get('content'): | |
| property_name = meta_tag['name'][8:] # Remove 'twitter:' prefix | |
| twitter_metadata[property_name] = meta_tag['content'].strip() | |
| if twitter_metadata: | |
| metadata['twitter_card'] = twitter_metadata | |
| # Extract schema.org structured data (JSON-LD) | |
| schema_metadata = [] | |
| for script in soup.find_all('script', attrs={'type': 'application/ld+json'}): | |
| if script.string: | |
| try: | |
| import json | |
| schema_data = json.loads(script.string) | |
| schema_metadata.append(schema_data) | |
| except Exception as e: | |
| logger.debug(f"Error parsing JSON-LD: {e}") | |
| if schema_metadata: | |
| metadata['structured_data'] = schema_metadata | |
| # Extract text content statistics | |
| text_content = soup.get_text(separator=' ', strip=True) | |
| if text_content: | |
| word_count = len(text_content.split()) | |
| metadata['word_count'] = word_count | |
| metadata['text_length'] = len(text_content) | |
| return metadata | |
| def _extract_domain(self, url: str) -> str: | |
| """Extract domain from URL""" | |
| parsed = tldextract.extract(url) | |
| return f"{parsed.domain}.{parsed.suffix}" if parsed.suffix else parsed.domain | |
| def calculate_priority(self, url: str, metadata: Dict[str, Any]) -> Priority: | |
| """ | |
| Calculate priority for a URL based on various factors | |
| Args: | |
| url: URL to calculate priority for | |
| metadata: Metadata extracted from the page | |
| Returns: | |
| Priority enum value | |
| """ | |
| # Default priority | |
| priority = Priority.MEDIUM | |
| try: | |
| # Extract path depth | |
| parsed = urlparse(url) | |
| path = parsed.path | |
| depth = len([p for p in path.split('/') if p]) | |
| # Prioritize URLs with shorter paths | |
| if depth <= 1: | |
| priority = Priority.HIGH | |
| elif depth <= 3: | |
| priority = Priority.MEDIUM | |
| else: | |
| priority = Priority.LOW | |
| # Prioritize URLs with certain keywords in path | |
| if re.search(r'(article|blog|news|post)', path, re.IGNORECASE): | |
| priority = Priority.HIGH | |
| # Deprioritize URLs with pagination patterns | |
| if re.search(r'(page|p|pg)=\d+', url, re.IGNORECASE): | |
| priority = Priority.LOW | |
| # Check metadata | |
| if metadata: | |
| # Prioritize based on title | |
| title = metadata.get('title', '') | |
| if title and len(title) > 10: | |
| priority = min(priority, Priority.MEDIUM) # Raise priority if it's lower | |
| # Prioritize based on description | |
| description = metadata.get('description', '') | |
| if description and len(description) > 50: | |
| priority = min(priority, Priority.MEDIUM) # Raise priority if it's lower | |
| # Prioritize based on word count | |
| word_count = metadata.get('word_count', 0) | |
| if word_count > 1000: | |
| priority = min(priority, Priority.HIGH) # High priority for content-rich pages | |
| elif word_count > 500: | |
| priority = min(priority, Priority.MEDIUM) | |
| return priority | |
| except Exception as e: | |
| logger.debug(f"Error calculating priority for URL {url}: {e}") | |
| return Priority.MEDIUM |