Spaces:
Sleeping
Sleeping
| import hashlib | |
| from typing import Dict, List, Set, Tuple | |
| from urllib.parse import urljoin, urlparse, urlunparse | |
| import requests | |
| from bs4 import BeautifulSoup, NavigableString, Tag | |
| class ContentCrawler: | |
| def __init__( | |
| self, base_url: str, ignore_prefixes: List[str] = None, max_length: int = 8000 | |
| ): | |
| """ | |
| Initialize the crawler with the base URL, a list of URL prefixes to ignore, and the maximum chunk size. | |
| Args: | |
| base_url: The website URL to crawl. | |
| ignore_prefixes: List of URL path prefixes to ignore. | |
| max_length: Maximum allowed size for a chunk. | |
| """ | |
| self.base_url = base_url | |
| self.visited = set() | |
| self.results = [] | |
| self.max_length = max_length | |
| self.ignore_prefixes = ignore_prefixes or [ | |
| "manage/", | |
| "password/", | |
| "media/", | |
| "notes-de-mises-a-jour/", | |
| ] | |
| # Pour éviter les doublons de contenu | |
| self.content_hashes = set() | |
| def crawl(self) -> List[Dict[str, str]]: | |
| """ | |
| Recursively crawl the website starting from the homepage. | |
| Returns: | |
| A list of dictionaries with keys 'url' and 'text' (in markdown format). | |
| """ | |
| try: | |
| homepage_response = requests.get(self.base_url) | |
| homepage_response.raise_for_status() | |
| except Exception as e: | |
| print(f"Error fetching homepage {self.base_url}: {e}") | |
| return [] | |
| homepage_soup = BeautifulSoup(homepage_response.text, "html.parser") | |
| initial_links = self._get_internal_links(homepage_soup) | |
| # Utiliser un ensemble pour éviter les doublons d'URLs dans la file | |
| queue = set() | |
| for link in initial_links: | |
| full_url = self._normalize_url(urljoin(self.base_url, link)) | |
| if full_url != self.base_url: | |
| queue.add(full_url) | |
| self.visited.add(full_url) | |
| # Convertir en liste pour le traitement | |
| queue_list = list(queue) | |
| while queue_list: | |
| current_url = queue_list.pop(0) | |
| print(f"Processing {current_url}") | |
| result, new_links = self._parse_page(current_url) | |
| if result: | |
| self.results.extend(result) | |
| # Ajouter seulement les liens non visités | |
| for link in new_links: | |
| full_url = self._normalize_url(urljoin(self.base_url, link)) | |
| if full_url not in self.visited and full_url != self.base_url: | |
| self.visited.add(full_url) | |
| queue_list.append(full_url) | |
| return self.results | |
| def _normalize_url(self, url: str) -> str: | |
| """Normaliser l'URL en supprimant les fragments et paramètres de requête.""" | |
| parsed = urlparse(url) | |
| # Supprimer fragment et query params | |
| normalized = parsed._replace(fragment="", query="") | |
| return urlunparse(normalized) | |
| def _get_internal_links(self, soup: BeautifulSoup) -> Set[str]: | |
| """ | |
| Retrieve internal links from the BeautifulSoup object, | |
| ignoring those whose path starts with any of the specified prefixes. | |
| """ | |
| links = set() | |
| for a_tag in soup.find_all("a", href=True): | |
| href = a_tag["href"] | |
| if href.startswith("#") or href.startswith("javascript:"): | |
| continue | |
| parsed_href = urlparse(href) | |
| path = parsed_href.path.lstrip("/") | |
| if any(path.startswith(prefix) for prefix in self.ignore_prefixes): | |
| continue | |
| # S'assurer que le lien est interne | |
| is_internal = ( | |
| not parsed_href.netloc | |
| or self.base_url in href | |
| or parsed_href.netloc == urlparse(self.base_url).netloc | |
| ) | |
| if is_internal: | |
| links.add(href) | |
| return links | |
| def _parse_page(self, url: str) -> Tuple[List[Dict[str, str]], Set[str]]: | |
| """Parse une page et extrait son contenu ainsi que ses liens.""" | |
| try: | |
| response = requests.get(url) | |
| response.raise_for_status() | |
| except Exception as e: | |
| print(f"Error fetching {url}: {e}") | |
| return [], set() | |
| soup = BeautifulSoup(response.text, "html.parser") | |
| # Trouver la div principale de contenu | |
| content_div = soup.find(id="content") | |
| if not content_div: | |
| print(f"No content div found in {url}") | |
| return [], self._get_internal_links(soup) | |
| # Nettoyer le contenu | |
| for script in content_div.find_all(["script", "style"]): | |
| script.decompose() | |
| # Récupérer le titre | |
| h1_tag = content_div.find("h1") | |
| page_title = h1_tag.get_text(strip=True) if h1_tag else "" | |
| # Créer le markdown complet | |
| markdown_content = self._extract_structured_content(content_div, page_title) | |
| # Vérifier si le contenu est un doublon | |
| content_hash = self._hash_content(markdown_content) | |
| if content_hash in self.content_hashes: | |
| print(f"Duplicate content skipped for {url}") | |
| return [], self._get_internal_links(soup) | |
| self.content_hashes.add(content_hash) | |
| # Diviser en chunks si nécessaire | |
| chunks = self._split_content(markdown_content) | |
| # Créer la liste des résultats | |
| results = [] | |
| for i, chunk in enumerate(chunks): | |
| results.append({"url": f"{url}#chunk-{i+1}", "text": chunk}) | |
| return results, self._get_internal_links(soup) | |
| def _extract_structured_content(self, content_div: Tag, page_title: str) -> str: | |
| """Extrait le contenu de manière structurée en respectant la hiérarchie de titres.""" | |
| lines = [] | |
| # Ajouter le titre principal | |
| if page_title: | |
| lines.append(f"# {page_title}") | |
| # Identifier tous les titres et le contenu | |
| current_element = content_div.find_next() | |
| while current_element and current_element.parent == content_div: | |
| if current_element.name in ["h1", "h2", "h3", "h4", "h5", "h6"]: | |
| # Convertir le niveau de titre | |
| level = int(current_element.name[1]) | |
| text = current_element.get_text(strip=True) | |
| lines.append(f"{'#' * level} {text}") | |
| else: | |
| markdown = self._convert_element_to_markdown(current_element) | |
| if markdown: | |
| lines.append(markdown) | |
| # Passer à l'élément suivant au même niveau | |
| current_element = current_element.find_next_sibling() | |
| return "\n\n".join(line for line in lines if line) | |
| def _convert_element_to_markdown(self, element) -> str: | |
| """Convertit un élément HTML en markdown.""" | |
| if isinstance(element, NavigableString): | |
| text = element.strip() | |
| return text if text else "" | |
| if isinstance(element, Tag): | |
| if element.name in ["script", "style", "iframe"]: | |
| return "" | |
| if element.name == "p": | |
| return element.get_text(strip=True) | |
| elif element.name == "a" and element.get("href"): | |
| text = element.get_text(strip=True) | |
| href = element.get("href") | |
| return f"[{text}]({href})" | |
| elif element.name in ["ul", "ol"]: | |
| items = [] | |
| for li in element.find_all("li", recursive=False): | |
| text = li.get_text(strip=True) | |
| if text: | |
| items.append(f"* {text}") | |
| return "\n".join(items) | |
| elif element.name == "table": | |
| # Extraction basique des tableaux | |
| rows = [] | |
| for tr in element.find_all("tr"): | |
| cols = [] | |
| for td in tr.find_all(["td", "th"]): | |
| cols.append(td.get_text(strip=True)) | |
| rows.append(" | ".join(cols)) | |
| if rows: | |
| # Ajouter la ligne de séparation après l'en-tête | |
| if len(rows) > 1: | |
| rows.insert(1, "-" * len(rows[0])) | |
| return "\n".join(rows) | |
| return "" | |
| elif element.name in ["div", "section", "article"]: | |
| parts = [] | |
| for child in element.children: | |
| part = self._convert_element_to_markdown(child) | |
| if part: | |
| parts.append(part) | |
| return "\n\n".join(parts) | |
| else: | |
| text = element.get_text(strip=True) | |
| return text if text else "" | |
| return "" | |
| def _split_content(self, content: str) -> List[str]: | |
| """Divise le contenu en chunks de taille maximale.""" | |
| if len(content) <= self.max_length: | |
| return [content] | |
| # Extraction du titre principal pour le préserver dans chaque chunk | |
| lines = content.split("\n\n") | |
| main_title = lines[0] if lines and lines[0].startswith("# ") else "" | |
| chunks = [] | |
| current_chunk = main_title | |
| current_length = len(main_title) | |
| for line in lines: | |
| # Ignorer le titre principal déjà traité | |
| if line == main_title: | |
| continue | |
| line_length = len(line) | |
| # Si la ligne seule dépasse la taille max, on doit la diviser | |
| if line_length > self.max_length: | |
| # D'abord ajouter le chunk courant s'il y a du contenu | |
| if current_length > len(main_title): | |
| chunks.append(current_chunk) | |
| # Diviser cette longue ligne en sous-parties | |
| start = 0 | |
| while start < line_length: | |
| part = line[start : start + self.max_length] | |
| if main_title and not part.startswith("#"): | |
| chunks.append(f"{main_title}\n\n{part}") | |
| else: | |
| chunks.append(part) | |
| start += self.max_length | |
| # Réinitialiser le chunk courant | |
| current_chunk = main_title | |
| current_length = len(main_title) | |
| else: | |
| # Si l'ajout de cette ligne dépasse la taille max, créer un nouveau chunk | |
| if current_length + line_length + 4 > self.max_length: # +4 pour \n\n | |
| chunks.append(current_chunk) | |
| current_chunk = main_title | |
| current_length = len(main_title) | |
| if main_title and current_chunk: | |
| current_chunk += "\n\n" | |
| current_length += 2 | |
| # Ajouter la ligne au chunk courant | |
| if current_chunk: | |
| current_chunk += "\n\n" + line | |
| current_length += line_length + 2 | |
| else: | |
| current_chunk = line | |
| current_length = line_length | |
| # Ajouter le dernier chunk s'il reste du contenu | |
| if current_length > len(main_title): | |
| chunks.append(current_chunk) | |
| return chunks | |
| def _hash_content(self, content: str) -> str: | |
| """Crée un hash du contenu pour identifier les doublons.""" | |
| # Utiliser seulement le contenu principal (pas les URLs) pour la détection de doublons | |
| return hashlib.md5(content.encode()).hexdigest() | |